add updates to show bedir

This commit is contained in:
Osman Faruk Bayram 2025-05-05 20:38:37 +03:00
parent d5588dd055
commit 1d3d74d9d6
6 changed files with 118 additions and 210 deletions

View file

@ -1,135 +1,27 @@
from enum import Enum
from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from backend.config import pwd_context, get_session_db
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
from fastapi import Depends, HTTPException
from typing import Annotated, Optional
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
import jwt
from sqlmodel import SQLModel, Field, Session, select
from pydantic.networks import EmailStr
from sqlalchemy import Column, Integer, String, Enum, DateTime
import enum, datetime
class Token(BaseModel):
access_token: str
token_type: str
from ..config import Base
### ENUMS ###
class Role(str, Enum):
user = "user"
class Role(str, enum.Enum):
admin = "admin"
guest = "guest"
user = "user"
mod = "mod"
class Status(str, Enum):
active = "active"
class Status(str, enum.Enum):
banned = "banned"
active = "active"
suspended = "suspended"
### KULLANICI MODELLERİ ###
class UserBase(SQLModel):
username: Optional[str] = None
user_id: Optional[int] = None
role: Optional[Role] = None
status: Optional[Status] = None
class UserInDb(UserBase):
hashed_password: str | None = None
class UserPublic(UserBase):
pass
class UserCreate(BaseModel):
username: Optional[str] = None
role: Optional[Role] = None
email : EmailStr | None = None
status: Optional[Status] = None
password : str | None = None
### VERİTABANI MODELİ ###
class DBUser(SQLModel, table=True):
__tablename__ = "users" # opsiyonel, sqlmodel bunu otomatik de atar
user_id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False)
hashed_password: str = Field(nullable=False)
role: Role = Field(default=Role.user)
status: Status = Field(default=Status.active)
### AUTH ###
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def authenticate_user(
session: Annotated[Session, Depends(get_session_db)],
username: str,
password: str
) -> UserInDb | None:
statement = select(DBUser).where(DBUser.username == username)
result = session.exec(statement).first()
if not result or not verify_password(password, result.hashed_password):
return None
return result
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
return encoded_jwt
async def get_user(
session: Annotated[Session, Depends(get_session_db)],
username: str
) -> UserInDb | None:
statement = select(DBUser).where(DBUser.username == username)
result = session.exec(statement).first()
return result
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[Session, Depends(get_session_db)]
) -> UserPublic:
credentials_exception = HTTPException(
status_code=401,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
username: Optional[str] = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = await get_user(session, username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[UserInDb, Depends(get_current_user)]
) -> UserPublic:
if current_user.status == Status.banned:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
class User(Base):
__tablename__ = "users"
user_id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
name = Column(String)
surname = Column(String)
hashedPassword = Column(String)
email = Column(String, unique=True)
role = Column(Enum(Role), default=Role.user)
status = Column(Enum(Status), default=Status.active)
bio = Column(String(144))
created_date = Column(DateTime, default=datetime.datetime.utcnow)

View file

@ -1,15 +1,14 @@
from fastapi import APIRouter, Depends, HTTPException, status
from .models import Token, UserPublic
from .models import authenticate_user, create_access_token
from datetime import timedelta
from ..auth.models import get_password_hash, verify_password
from typing import Annotated
from sqlmodel import Session
from ..config import get_session_db
from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm
from .models import UserCreate, DBUser
import os
from fastapi import APIRouter, HTTPException
import bcrypt
import jwt
from fastapi import Depends
from sqlalchemy.orm import Session
from .models import User
from .schemas import UserCreate, UserOut, UserLogin
from ..config import get_db
router = APIRouter(
prefix="/auth",
@ -18,41 +17,40 @@ router = APIRouter(
dependencies=[],
)
@router.post('/login')
async def login_for_access_token(
form_data : Annotated[OAuth2PasswordRequestForm, Depends()],
session : Annotated[Session, Depends(get_session_db)],
) -> Token:
def create_token(user: User):
return jwt.encode({"sub": user.username}, os.getenv("SECRET_KEY"), algorithm=os.getenv("ALGORITHM"))
user = authenticate_user(session, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=30)
access_token = create_access_token(
data={"sub": user.username, "role": user.role, 'status': user.status}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
def verify_token(token: str):
try:
data = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
return data.get("sub")
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
@router.post('/register', response_model=UserPublic)
async def create_user(
session : Annotated[Session, Depends(get_session_db)],
user : Annotated[UserCreate, Depends()]
):
user_dict = user.dict()
print(user.password)
user_dict['hashed_password'] = get_password_hash(user.password)
print (user_dict['hashed_password'])
@router.post("/register")
def register(user: UserCreate, db: Session = Depends(get_db)):
if db.query(User).filter_by(username=user.username).first():
raise HTTPException(400, "Username taken")
hashed = bcrypt.hashpw(user.password.encode(), bcrypt.gensalt()).decode()
db_user = User(**user.model_dump(exclude={"password"}), hashedPassword=hashed)
db.add(db_user)
db.commit()
return {"msg": "User created"}
if not verify_password(user.password, user_dict['hashed_password']):
raise HTTPException(status_code=400, detail="Password hashing failed")
@router.post("/login")
def login(user: UserLogin, db: Session = Depends(get_db)):
db_user = db.query(User).filter_by(username=user.username).first()
if not db_user or not bcrypt.checkpw(user.password.encode(), db_user.hashedPassword.encode()):
raise HTTPException(401, "Invalid creds")
return {"token": create_token(db_user)}
db_user = DBUser.model_validate(user_dict)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@router.get("/me", response_model=UserOut)
def get_me(token: str, db: Session = Depends(get_db)):
username = verify_token(token)
if not username:
raise HTTPException(401, "Invalid token")
user = db.query(User).filter_by(username=username).first()
return user

23
auth/schemas.py Normal file
View file

@ -0,0 +1,23 @@
from pydantic import BaseModel, EmailStr
from .models import Role, Status
class UserCreate(BaseModel):
username: str
name: str
surname: str
password: str
email: EmailStr
bio: str = ""
class UserOut(BaseModel):
username: str
name: str
surname: str
email: EmailStr
role: Role
status: Status
bio: str
class UserLogin(BaseModel):
username: str
password: str