user process

This commit is contained in:
bdrtr 2025-05-07 18:33:35 +03:00
parent 36da53a562
commit 842c127817
2 changed files with 101 additions and 3 deletions

View file

@ -1,4 +1,6 @@
from enum import Enum
import random
import smtplib
from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES ,pwd_context, get_session_db, Base
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
@ -8,6 +10,7 @@ from fastapi.security import OAuth2PasswordBearer
from pydantic.networks import EmailStr
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import Session
from email.message import EmailMessage
import jwt
class Token(BaseModel):
@ -170,4 +173,68 @@ def register_user(
session.add(db_user) # donuşum yaparken dikkat et
session.commit()
session.refresh(db_user)
return db_user
return db_user
def find_user_w_email(
session: Annotated[Session, Depends(get_session_db)],
email: EmailStr | None = None,
):
exist_user = session.query(DBUser).filter(DBUser.email == email).first() #email ile kullanıcıyı bul
if exist_user is None:
raise HTTPException(status_code=400, detail="User not found")
if exist_user.status == Status.banned:
raise HTTPException(status_code=400, detail="Inactive user")
return True
def send_password_to_email(
session: Annotated[Session, Depends(get_session_db)],
email: EmailStr | None = None,
) -> str:
msg = EmailMessage() #obje oluştur
msg['Subject'] = 'Password Reset'
msg['From'] = 'hansneiumann@gmail.com'
msg['To'] = email
veritification_code = generate_password_reset_number()
msg.set_content(veritification_code)
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
smtp.login("hansneiumann@gmail.com", "rwaq mbil lzut dgja")
smtp.send_message(msg)
update_password_w_email(session, email=email, password=veritification_code) #şifreyi güncelle
def generate_password_reset_number() -> str:
return str(random.randint(10000000, 99999999)) # 8 haneli rastgele bir sayı döndür
def update_password_w_email(
session: Annotated[Session, Depends(get_session_db)],
password: str | None = None,
email: EmailStr | None = None,
) -> dict:
hashed_password = get_password_hash(password)
session.query(DBUser).filter(DBUser.email == email).update({"hashed_password": hashed_password})
session.commit()
return {"message": "Password updated successfully"}
def update_password_w_user(
session: Annotated[Session, Depends(get_session_db)],
user: Annotated[DBUser , None],
password: str | None = None,
):
hashed_password = get_password_hash(password)
session.query(DBUser).filter(DBUser.user_id == user.user_id).update({"hashed_password": hashed_password})
session.commit()
session.refresh(user)