backend/auth/models.py
2025-05-05 19:06:39 +03:00

135 lines
3.9 KiB
Python

from enum import Enum
from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from backend.config import pwd_context, get_session_db
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
from fastapi import Depends, HTTPException
from typing import Annotated, Optional
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
import jwt
from sqlmodel import SQLModel, Field, Session, select
from pydantic.networks import EmailStr
class Token(BaseModel):
access_token: str
token_type: str
### ENUMS ###
class Role(str, Enum):
user = "user"
admin = "admin"
guest = "guest"
mod = "mod"
class Status(str, Enum):
active = "active"
banned = "banned"
suspended = "suspended"
### KULLANICI MODELLERİ ###
class UserBase(SQLModel):
username: Optional[str] = None
user_id: Optional[int] = None
role: Optional[Role] = None
status: Optional[Status] = None
class UserInDb(UserBase):
hashed_password: str | None = None
class UserPublic(UserBase):
pass
class UserCreate(BaseModel):
username: Optional[str] = None
role: Optional[Role] = None
email : EmailStr | None = None
status: Optional[Status] = None
password : str | None = None
### VERİTABANI MODELİ ###
class DBUser(SQLModel, table=True):
__tablename__ = "users" # opsiyonel, sqlmodel bunu otomatik de atar
user_id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False)
hashed_password: str = Field(nullable=False)
role: Role = Field(default=Role.user)
status: Status = Field(default=Status.active)
### AUTH ###
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def authenticate_user(
session: Annotated[Session, Depends(get_session_db)],
username: str,
password: str
) -> UserInDb | None:
statement = select(DBUser).where(DBUser.username == username)
result = session.exec(statement).first()
if not result or not verify_password(password, result.hashed_password):
return None
return result
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
return encoded_jwt
async def get_user(
session: Annotated[Session, Depends(get_session_db)],
username: str
) -> UserInDb | None:
statement = select(DBUser).where(DBUser.username == username)
result = session.exec(statement).first()
return result
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[Session, Depends(get_session_db)]
) -> UserPublic:
credentials_exception = HTTPException(
status_code=401,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
username: Optional[str] = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = await get_user(session, username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[UserInDb, Depends(get_current_user)]
) -> UserPublic:
if current_user.status == Status.banned:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user