from enum import Enum import random import smtplib from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES ,pwd_context, get_session_db, Base from datetime import datetime, timedelta, timezone from pydantic import BaseModel from fastapi import Depends, HTTPException from typing import Annotated from fastapi.security import OAuth2PasswordBearer from pydantic.networks import EmailStr from sqlalchemy import Integer, DateTime from sqlalchemy.orm import Session, relationship, mapped_column, Mapped from sqlalchemy.dialects.postgresql import ARRAY from email.message import EmailMessage import jwt class Token(BaseModel): access_token: str token_type: str ### ENUMS ### class Role(str, Enum): user = "user" admin = "admin" guest = "guest" mod = "mod" class Status(str, Enum): active = "active" banned = "banned" suspended = "suspended" ### KULLANICI MODELLERİ ### sqlalchemy ve pydantic modelleri farklıdır class UserBase(BaseModel): #bu bir veri tabanı modeli değil !!!! lütfen dikkat et username: str | None = None #Option yerine Union kullanabilirsin role: Role | None = None status: Status | None = None class UserInDb(UserBase): user_id: int | None = None email: EmailStr | None = None hashed_password: str | None = None class UserPublic(BaseModel): username : str | None = None role : Role | None = None status : Status | None = None class UserCreate(BaseModel): username: str | None = None role: Role | None = None email : EmailStr | None = None status: Status | None = None password : str | None = None ### VERİTABANI MODELİ ### class DBUser(Base): __tablename__ = "users_table" user_id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True) username : Mapped[str] = mapped_column(unique=True, index=True, nullable=False) email : Mapped[str] = mapped_column(unique=True, index=True, nullable=False) hashed_password : Mapped[str] = mapped_column(nullable=False) role : Mapped[Role] = mapped_column(default=Role.user) status : Mapped[Status] = mapped_column(default=Status.active) created_date : Mapped[datetime] = mapped_column(DateTime, default=datetime.now()) #datetime.datetime -> python, DateTime -> sqlalchemy bio : Mapped[str] = mapped_column(default="No bio") follow_users : Mapped[list[int]] = mapped_column(ARRAY(Integer), default=[]) # takip edilen kullanıcılar # -> buralar diğer tablolar ile olan ilişkiler from ..items.models import Items items : Mapped[list['Items']] = relationship("Items", back_populates="user", cascade="all, delete-orphan", lazy='select') ### AUTH ### oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") ### SERVİSLER ### def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def authenticate_user( session: Annotated[Session, Depends(get_session_db)], username: str, password: str ) -> UserInDb | None: user = session.query(DBUser).filter(DBUser.username == username).first() if user is None or not verify_password(password, user.hashed_password): #sqlalchemy'de bu şekilde kontrol ediliyor None ile return None return user def create_access_token( data: dict, expires_delta: Annotated[timedelta, None] = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES), ) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_user( session: Annotated[Session, Depends(get_session_db)], username: str ) -> UserInDb | None: user = session.query(DBUser).filter(DBUser.username == username).first() return user async def get_current_user( token: Annotated[str, Depends(oauth2_scheme)], session: Annotated[Session, Depends(get_session_db)] ) -> UserPublic: credentials_exception = HTTPException( status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username : str | None = payload.get("sub") user = UserInDb.model_validate(payload) if username is None: raise credentials_exception except jwt.PyJWTError: raise credentials_exception user = await get_user(session, username) if user is None: raise credentials_exception return user async def get_current_active_user( current_user: Annotated[UserInDb, Depends(get_current_user)] ) -> UserPublic: if current_user.status == Status.banned: raise HTTPException(status_code=400, detail="Inactive user") return current_user ### Kullanıcı kaydı def register_user( session: Annotated[Session, Depends(get_session_db)], user: Annotated[UserCreate, Depends()] ) -> UserPublic: user_dict = user.dict() # kullanıcıdan gelen verileri alıyoruz çunku şifreyi hashleyeceğiz user_dict['hashed_password'] = get_password_hash(user.password) # şifreyi hashliyoruz if not verify_password(user.password, user_dict['hashed_password']): raise HTTPException(status_code=400, detail="Password hashing failed") # şifre hashleme işlemi başarısız oldu # Kullanıcı adı ve e-posta adresinin benzersiz olduğunu kontrol et existing_user = session.query(DBUser).filter( (DBUser.username == user.username) | (DBUser.email == user.email) ).first() if existing_user: raise HTTPException(status_code=400, detail="Username or email already registered") user_dict['created_date'] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # kullanıcı oluşturulma tarihi user_dict.pop('password') ##password'u veri tabanına eklemiyoruz zaten sınıfımızda tanımlı değil hata verir db_user = DBUser(**user_dict) #alchemy ile pydantic modelleri farklıdır bir birine session.add(db_user) # donuşum yaparken dikkat et session.commit() session.refresh(db_user) return db_user def find_user_w_email( session: Annotated[Session, Depends(get_session_db)], email: EmailStr | None = None, ): exist_user = session.query(DBUser).filter(DBUser.email == email).first() #email ile kullanıcıyı bul if exist_user is None: raise HTTPException(status_code=400, detail="User not found") if exist_user.status == Status.banned: raise HTTPException(status_code=400, detail="Inactive user") return True def send_password_to_email( session: Annotated[Session, Depends(get_session_db)], email: EmailStr | None = None, ) -> str: msg = EmailMessage() #obje oluştur msg['Subject'] = 'Password Reset' msg['From'] = 'hansneiumann@gmail.com' msg['To'] = email veritification_code = generate_password_reset_number() msg.set_content(veritification_code) with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp: smtp.login("hansneiumann@gmail.com", "rwaq mbil lzut dgja") smtp.send_message(msg) update_password_w_email(session, email=email, password=veritification_code) #şifreyi güncelle def generate_password_reset_number() -> str: return str(random.randint(10000000, 99999999)) # 8 haneli rastgele bir sayı döndür def update_password_w_email( session: Annotated[Session, Depends(get_session_db)], password: str | None = None, email: EmailStr | None = None, ) -> dict: hashed_password = get_password_hash(password) session.query(DBUser).filter(DBUser.email == email).update({"hashed_password": hashed_password}) session.commit() return {"message": "Password updated successfully"} def update_password_w_user( session: Annotated[Session, Depends(get_session_db)], user: Annotated[DBUser , None], password: str | None = None, ) -> any: hashed_password = get_password_hash(password) session.query(DBUser).filter(DBUser.user_id == user.user_id).update({"hashed_password": hashed_password}) session.commit()