Revert "Merge pull request 'add updates to show bedir' (#14) from another into main"

This reverts commit 99f611b3d0, reversing
changes made to d5588dd055.
This commit is contained in:
Osman Faruk Bayram 2025-05-05 21:25:00 +03:00
parent 99f611b3d0
commit 83389e0c10
6 changed files with 210 additions and 118 deletions

View file

@ -1,27 +1,135 @@
from sqlalchemy import Column, Integer, String, Enum, DateTime from enum import Enum
import enum, datetime from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from backend.config import pwd_context, get_session_db
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
from fastapi import Depends, HTTPException
from typing import Annotated, Optional
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
import jwt
from sqlmodel import SQLModel, Field, Session, select
from pydantic.networks import EmailStr
from ..config import Base class Token(BaseModel):
access_token: str
token_type: str
class Role(str, enum.Enum):
admin = "admin" ### ENUMS ###
class Role(str, Enum):
user = "user" user = "user"
admin = "admin"
guest = "guest"
mod = "mod" mod = "mod"
class Status(str, enum.Enum): class Status(str, Enum):
banned = "banned"
active = "active" active = "active"
banned = "banned"
suspended = "suspended" suspended = "suspended"
class User(Base): ### KULLANICI MODELLERİ ###
__tablename__ = "users" class UserBase(SQLModel):
user_id = Column(Integer, primary_key=True) username: Optional[str] = None
username = Column(String, unique=True) user_id: Optional[int] = None
name = Column(String) role: Optional[Role] = None
surname = Column(String) status: Optional[Status] = None
hashedPassword = Column(String)
email = Column(String, unique=True) class UserInDb(UserBase):
role = Column(Enum(Role), default=Role.user) hashed_password: str | None = None
status = Column(Enum(Status), default=Status.active)
bio = Column(String(144)) class UserPublic(UserBase):
created_date = Column(DateTime, default=datetime.datetime.utcnow) pass
class UserCreate(BaseModel):
username: Optional[str] = None
role: Optional[Role] = None
email : EmailStr | None = None
status: Optional[Status] = None
password : str | None = None
### VERİTABANI MODELİ ###
class DBUser(SQLModel, table=True):
__tablename__ = "users" # opsiyonel, sqlmodel bunu otomatik de atar
user_id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, nullable=False)
hashed_password: str = Field(nullable=False)
role: Role = Field(default=Role.user)
status: Status = Field(default=Status.active)
### AUTH ###
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def authenticate_user(
session: Annotated[Session, Depends(get_session_db)],
username: str,
password: str
) -> UserInDb | None:
statement = select(DBUser).where(DBUser.username == username)
result = session.exec(statement).first()
if not result or not verify_password(password, result.hashed_password):
return None
return result
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
return encoded_jwt
async def get_user(
session: Annotated[Session, Depends(get_session_db)],
username: str
) -> UserInDb | None:
statement = select(DBUser).where(DBUser.username == username)
result = session.exec(statement).first()
return result
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[Session, Depends(get_session_db)]
) -> UserPublic:
credentials_exception = HTTPException(
status_code=401,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
username: Optional[str] = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = await get_user(session, username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[UserInDb, Depends(get_current_user)]
) -> UserPublic:
if current_user.status == Status.banned:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

View file

@ -1,14 +1,15 @@
import os from fastapi import APIRouter, Depends, HTTPException, status
from .models import Token, UserPublic
from fastapi import APIRouter, HTTPException from .models import authenticate_user, create_access_token
import bcrypt from datetime import timedelta
import jwt from ..auth.models import get_password_hash, verify_password
from typing import Annotated
from sqlmodel import Session
from ..config import get_session_db
from fastapi import Depends from fastapi import Depends
from sqlalchemy.orm import Session from fastapi.security import OAuth2PasswordRequestForm
from .models import User from .models import UserCreate, DBUser
from .schemas import UserCreate, UserOut, UserLogin
from ..config import get_db
router = APIRouter( router = APIRouter(
prefix="/auth", prefix="/auth",
@ -17,40 +18,41 @@ router = APIRouter(
dependencies=[], dependencies=[],
) )
def create_token(user: User): @router.post('/login')
return jwt.encode({"sub": user.username}, os.getenv("SECRET_KEY"), algorithm=os.getenv("ALGORITHM")) async def login_for_access_token(
form_data : Annotated[OAuth2PasswordRequestForm, Depends()],
session : Annotated[Session, Depends(get_session_db)],
) -> Token:
def verify_token(token: str): user = authenticate_user(session, form_data.username, form_data.password)
try: if not user:
data = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]) raise HTTPException(
return data.get("sub") status_code=status.HTTP_401_UNAUTHORIZED,
except jwt.ExpiredSignatureError: detail="Incorrect username or password",
raise HTTPException(401, "Token expired") headers={"WWW-Authenticate": "Bearer"},
except jwt.InvalidTokenError: )
raise HTTPException(401, "Invalid token") access_token_expires = timedelta(minutes=30)
access_token = create_access_token(
data={"sub": user.username, "role": user.role, 'status': user.status}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@router.post("/register") @router.post('/register', response_model=UserPublic)
def register(user: UserCreate, db: Session = Depends(get_db)): async def create_user(
if db.query(User).filter_by(username=user.username).first(): session : Annotated[Session, Depends(get_session_db)],
raise HTTPException(400, "Username taken") user : Annotated[UserCreate, Depends()]
hashed = bcrypt.hashpw(user.password.encode(), bcrypt.gensalt()).decode() ):
db_user = User(**user.model_dump(exclude={"password"}), hashedPassword=hashed) user_dict = user.dict()
db.add(db_user) print(user.password)
db.commit() user_dict['hashed_password'] = get_password_hash(user.password)
return {"msg": "User created"} print (user_dict['hashed_password'])
@router.post("/login") if not verify_password(user.password, user_dict['hashed_password']):
def login(user: UserLogin, db: Session = Depends(get_db)): raise HTTPException(status_code=400, detail="Password hashing failed")
db_user = db.query(User).filter_by(username=user.username).first()
if not db_user or not bcrypt.checkpw(user.password.encode(), db_user.hashedPassword.encode()):
raise HTTPException(401, "Invalid creds")
return {"token": create_token(db_user)}
@router.get("/me", response_model=UserOut) db_user = DBUser.model_validate(user_dict)
def get_me(token: str, db: Session = Depends(get_db)): session.add(db_user)
username = verify_token(token) session.commit()
if not username: session.refresh(db_user)
raise HTTPException(401, "Invalid token") return db_user
user = db.query(User).filter_by(username=username).first()
return user

View file

@ -1,23 +0,0 @@
from pydantic import BaseModel, EmailStr
from .models import Role, Status
class UserCreate(BaseModel):
username: str
name: str
surname: str
password: str
email: EmailStr
bio: str = ""
class UserOut(BaseModel):
username: str
name: str
surname: str
email: EmailStr
role: Role
status: Status
bio: str
class UserLogin(BaseModel):
username: str
password: str

View file

@ -1,13 +1,17 @@
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from passlib.context import CryptContext
from sqlmodel import SQLModel, Field, Session
from dotenv import load_dotenv from dotenv import load_dotenv
import os import os
load_dotenv() load_dotenv()
# Veritabanı URL'sini oluştur
DATABASE_URL = ( DATABASE_URL = (
f"postgresql://{os.getenv('USERNAME_DB')}:" f"postgresql://{os.getenv('USERNAME_DB')}:"
f"{os.getenv('PASSWORD_DB')}@" f"{os.getenv('PASSWORD_DB')}@"
@ -16,24 +20,16 @@ DATABASE_URL = (
f"{os.getenv('NAME_DB')}" f"{os.getenv('NAME_DB')}"
) )
engine = create_engine(DATABASE_URL, echo=False)
def init_db():
SQLModel.metadata.create_all(engine)
engine = create_engine(DATABASE_URL) def get_session_db():
SessionLocal = sessionmaker(bind=engine) with Session(engine) as session:
Base = declarative_base() yield session
from .auth.models import *
from .items.models import *
Base.metadata.create_all(bind=engine)
def get_db(): ### SECRET KEY ###
db = SessionLocal()
try:
yield db
finally:
db.close()
origins = [ origins = [
"http://localhost", "http://localhost",
"http://localhost:8080", "http://localhost:8080",
@ -42,6 +38,10 @@ origins = [
] ]
app = FastAPI() app = FastAPI()
@app.on_event("startup")
def on_startup():
init_db()
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=origins, allow_origins=origins,
@ -49,3 +49,6 @@ app.add_middleware(
allow_methods=["*"], allow_methods=["*"],
allow_headers=["*"], allow_headers=["*"],
) )

View file

@ -1,15 +1,12 @@
from datetime import datetime from datetime import datetime, timedelta, timezone
from ..config import Base from ..auth.models import UserBase
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column class UserProfile(UserBase):
bio : str | None = None
created_date : datetime | None = None
collections : list[str] | None = None
items :list[str] | None = None
class Item(Base):
__tablename__ = "items"
item_id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
description = Column(String(500), nullable=True)
price = Column(Integer, nullable=False)
created_date = Column(DateTime, default=datetime.utcnow)
updated_date = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
user: Mapped["User"] = relationship(back_populates="items")

View file

@ -1,4 +1,7 @@
from .models import UserProfile
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from typing import Annotated
from ..auth.models import get_current_active_user
router = APIRouter( router = APIRouter(
prefix="/items", prefix="/items",
@ -7,7 +10,9 @@ router = APIRouter(
dependencies=[], dependencies=[],
) )
@router.get('/profile', response_model=UserProfile)
async def get_user_profile(
current_user: Annotated[UserProfile, Depends(get_current_active_user)]
) -> UserProfile:
@router.get("/") return current_user
async def get_items():
return {"message": "List of items"}