from enum import Enum from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES from backend.config import pwd_context, get_session_db from datetime import datetime, timedelta, timezone from pydantic import BaseModel from fastapi import Depends, HTTPException from typing import Annotated, Optional from fastapi.security import OAuth2PasswordBearer from passlib.context import CryptContext import jwt from sqlmodel import SQLModel, Field, Session, select from pydantic.networks import EmailStr ### TOKEN MODELLERİ ### class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None role: Optional[str] = None status: Optional[str] = None ### ENUMS ### class Role(str, Enum): user = "user" admin = "admin" guest = "guest" mod = "mod" class Status(str, Enum): active = "active" banned = "banned" suspended = "suspended" ### KULLANICI MODELLERİ ### class UserBase(SQLModel): username: Optional[str] = None user_id: Optional[int] = None role: Optional[Role] = None status: Optional[Status] = None class UserInDb(UserBase): hashed_password: str | None = None class UserPublic(UserBase): pass class UserCreate(BaseModel): username: Optional[str] = None role: Optional[Role] = None email : EmailStr | None = None status: Optional[Status] = None password : str | None = None ### VERİTABANI MODELİ ### class DBUser(SQLModel, table=True): __tablename__ = "users" # opsiyonel, sqlmodel bunu otomatik de atar user_id: Optional[int] = Field(default=None, primary_key=True) username: str = Field(index=True, nullable=False) hashed_password: str = Field(nullable=False) role: Role = Field(default=Role.user) status: Status = Field(default=Status.active) ### AUTH ### oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def authenticate_user( session: Annotated[Session, Depends(get_session_db)], username: str, password: str ) -> UserInDb | None: statement = select(DBUser).where(DBUser.username == username) result = session.exec(statement).first() if not result or not verify_password(password, result.hashed_password): return None return result def create_access_token( data: dict, expires_delta: Optional[timedelta] = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES), ) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm="HS256") return encoded_jwt async def get_user( session: Annotated[Session, Depends(get_session_db)], username: str ) -> UserInDb | None: statement = select(DBUser).where(DBUser.username == username) result = session.exec(statement).first() return result async def get_current_user( token: Annotated[str, Depends(oauth2_scheme)], session: Annotated[Session, Depends(get_session_db)] ) -> UserPublic: credentials_exception = HTTPException( status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) token_data = TokenData(**payload) username: Optional[str] = payload.get("sub") if username is None: raise credentials_exception except jwt.PyJWTError: raise credentials_exception user = await get_user(session, username) if user is None: raise credentials_exception return user async def get_current_active_user( current_user: Annotated[UserInDb, Depends(get_current_user)] ) -> UserPublic: if current_user.status == Status.banned: raise HTTPException(status_code=400, detail="Inactive user") return current_user