backend/auth/models.py
2025-05-08 22:43:42 +03:00

252 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from enum import Enum
import random
import smtplib
from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES ,pwd_context, get_session_db, Base
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
from fastapi import Depends, HTTPException
from typing import Annotated
from fastapi.security import OAuth2PasswordBearer
from pydantic.networks import EmailStr
from sqlalchemy import Integer, DateTime
from sqlalchemy.orm import Session, relationship, mapped_column, Mapped
from sqlalchemy.dialects.postgresql import ARRAY
from email.message import EmailMessage
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..items.models import Items
import jwt
class Token(BaseModel):
access_token: str
token_type: str
### ENUMS ###
class Role(str, Enum):
user = "user"
admin = "admin"
guest = "guest"
mod = "mod"
class Status(str, Enum):
active = "active"
banned = "banned"
suspended = "suspended"
### KULLANICI MODELLERİ ### sqlalchemy ve pydantic modelleri farklıdır
class UserBase(BaseModel): #bu bir veri tabanı modeli değil !!!! lütfen dikkat et
username: str | None = None #Option yerine Union kullanabilirsin
role: Role | None = None
status: Status | None = None
class UserInDb(UserBase):
user_id: int | None = None
email: EmailStr | None = None
hashed_password: str | None = None
class UserPublic(BaseModel):
username : str | None = None
role : Role | None = None
status : Status | None = None
class UserCreate(BaseModel):
username: str | None = None
role: Role | None = None
email : EmailStr | None = None
status: Status | None = None
password : str | None = None
### VERİTABANI MODELİ ###
class DBUser(Base):
__tablename__ = "users_table"
user_id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True)
username : Mapped[str] = mapped_column(unique=True, index=True, nullable=False)
email : Mapped[str] = mapped_column(unique=True, index=True, nullable=False)
hashed_password : Mapped[str] = mapped_column(nullable=False)
role : Mapped[Role] = mapped_column(default=Role.user)
status : Mapped[Status] = mapped_column(default=Status.active)
created_date : Mapped[datetime] = mapped_column(DateTime, default=datetime.now()) #datetime.datetime -> python, DateTime -> sqlalchemy
bio : Mapped[str] = mapped_column(default="No bio")
follow_users : Mapped[list[int]] = mapped_column(ARRAY(Integer), default=[]) # takip edilen kullanıcılar
# -> buralar diğer tablolar ile olan ilişkiler
items : Mapped[list['Items']] = relationship("Items", back_populates="user", cascade="all, delete-orphan")
collections : Mapped[int] = mapped_column(default=0) # hat vermesin diye eklendi collections aktif değil
### AUTH ###
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
### SERVİSLER ###
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def authenticate_user(
session: Annotated[Session, Depends(get_session_db)],
username: str,
password: str
) -> UserInDb | None:
user = session.query(DBUser).filter(DBUser.username == username).first()
if user is None or not verify_password(password, user.hashed_password): #sqlalchemy'de bu şekilde kontrol ediliyor None ile
return None
return user
def create_access_token(
data: dict,
expires_delta: Annotated[timedelta, None] = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_user(
session: Annotated[Session, Depends(get_session_db)],
username: str
) -> UserInDb | None:
user = session.query(DBUser).filter(DBUser.username == username).first()
return user
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[Session, Depends(get_session_db)]
) -> UserPublic:
credentials_exception = HTTPException(
status_code=401,
detail="Invalid credentials currently",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username : str | None = payload.get("sub")
user = UserInDb.model_validate(payload)
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = await get_user(session, username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[UserInDb, Depends(get_current_user)]
) -> UserPublic:
if current_user.status == Status.banned:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
### Kullanıcı kaydı
def register_user(
session: Annotated[Session, Depends(get_session_db)],
user: Annotated[UserCreate, Depends()]
) -> UserPublic:
user_dict = user.dict() # kullanıcıdan gelen verileri alıyoruz çunku şifreyi hashleyeceğiz
user_dict['hashed_password'] = get_password_hash(user.password) # şifreyi hashliyoruz
if not verify_password(user.password, user_dict['hashed_password']):
raise HTTPException(status_code=400, detail="Password hashing failed") # şifre hashleme işlemi başarısız oldu
# Kullanıcı adı ve e-posta adresinin benzersiz olduğunu kontrol et
existing_user = session.query(DBUser).filter(
(DBUser.username == user.username) | (DBUser.email == user.email)
).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username or email already registered")
user_dict['created_date'] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # kullanıcı oluşturulma tarihi
user_dict.pop('password') ##password'u veri tabanına eklemiyoruz zaten sınıfımızda tanımlı değil hata verir
db_user = DBUser(**user_dict) #alchemy ile pydantic modelleri farklıdır bir birine
session.add(db_user) # donuşum yaparken dikkat et
session.commit()
session.refresh(db_user)
return db_user
def find_user_w_email(
session: Annotated[Session, Depends(get_session_db)],
email: EmailStr | None = None,
):
exist_user = session.query(DBUser).filter(DBUser.email == email).first() #email ile kullanıcıyı bul
if exist_user is None:
raise HTTPException(status_code=400, detail="User not found")
if exist_user.status == Status.banned:
raise HTTPException(status_code=400, detail="Inactive user")
return True
def send_password_to_email(
session: Annotated[Session, Depends(get_session_db)],
email: EmailStr | None = None,
) -> str:
msg = EmailMessage() #obje oluştur
msg['Subject'] = 'Password Reset'
msg['From'] = 'hansneiumann@gmail.com'
msg['To'] = email
veritification_code = generate_password_reset_number()
msg.set_content(veritification_code)
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
smtp.login("hansneiumann@gmail.com", "rwaq mbil lzut dgja")
smtp.send_message(msg)
update_password_w_email(session, email=email, password=veritification_code) #şifreyi güncelle
def generate_password_reset_number() -> str:
return str(random.randint(10000000, 99999999)) # 8 haneli rastgele bir sayı döndür
def update_password_w_email(
session: Annotated[Session, Depends(get_session_db)],
password: str | None = None,
email: EmailStr | None = None,
) -> dict:
hashed_password = get_password_hash(password)
session.query(DBUser).filter(DBUser.email == email).update({"hashed_password": hashed_password})
session.commit()
return {"message": "Password updated successfully"}
def update_password_w_user(
session: Annotated[Session, Depends(get_session_db)],
user: Annotated[DBUser , None],
password: str | None = None,
) -> any:
hashed_password = get_password_hash(password)
session.query(DBUser).filter(DBUser.user_id == user.user_id).update({"hashed_password": hashed_password})
session.commit()