from fastapi import APIRouter, Depends, HTTPException, status from .models import Token, UserPublic from .models import authenticate_user, create_access_token from datetime import timedelta from ..auth.models import get_password_hash, verify_password from typing import Annotated from sqlmodel import Session from ..config import get_session_db from fastapi import Depends from fastapi.security import OAuth2PasswordRequestForm from .models import UserCreate, DBUser router = APIRouter( prefix="/auth", tags=["auth"], responses={404: {"description": "Not found"}}, dependencies=[], ) @router.post('/login') async def login_for_access_token( form_data : Annotated[OAuth2PasswordRequestForm, Depends()], session : Annotated[Session, Depends(get_session_db)], ) -> Token: user = authenticate_user(session, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=30) access_token = create_access_token( data={"sub": user.username, "role": user.role, 'status': user.status}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer") @router.post('/register', response_model=UserPublic) async def create_user( session : Annotated[Session, Depends(get_session_db)], user : Annotated[UserCreate, Depends()] ): user_dict = user.dict() print(user.password) user_dict['hashed_password'] = get_password_hash(user.password) print (user_dict['hashed_password']) if not verify_password(user.password, user_dict['hashed_password']): raise HTTPException(status_code=400, detail="Password hashing failed") db_user = DBUser.model_validate(user_dict) session.add(db_user) session.commit() session.refresh(db_user) return db_user