Add helper functions for User
This commit is contained in:
parent
5bd8d795a8
commit
46ee366919
128
helpers/user_helpers.py
Normal file
128
helpers/user_helpers.py
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
from typing import Optional, Dict, Union
|
||||||
|
import re
|
||||||
|
from datetime import datetime
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
from passlib.context import CryptContext
|
||||||
|
from models.user import User
|
||||||
|
from schemas.user import UserCreate
|
||||||
|
from email_validator import validate_email, EmailNotValidError
|
||||||
|
|
||||||
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||||
|
|
||||||
|
def validate_password_strength(password: str) -> Dict[str, Union[bool, str]]:
|
||||||
|
"""
|
||||||
|
Validate password meets minimum requirements.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
password: Password string to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing validation result and message
|
||||||
|
"""
|
||||||
|
if len(password) < 8:
|
||||||
|
return {"valid": False, "message": "Password must be at least 8 characters"}
|
||||||
|
|
||||||
|
if not any(char.isdigit() for char in password):
|
||||||
|
return {"valid": False, "message": "Password must contain at least one number"}
|
||||||
|
|
||||||
|
if not any(char.isupper() for char in password):
|
||||||
|
return {"valid": False, "message": "Password must contain at least one uppercase letter"}
|
||||||
|
|
||||||
|
return {"valid": True, "message": "Password is valid"}
|
||||||
|
|
||||||
|
def validate_user_input(email: str, username: str, password: str) -> Dict[str, Union[bool, str]]:
|
||||||
|
"""
|
||||||
|
Validate user signup input fields.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
email: Email to validate
|
||||||
|
username: Username to validate
|
||||||
|
password: Password to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing validation result and message
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
validate_email(email)
|
||||||
|
except EmailNotValidError:
|
||||||
|
return {"valid": False, "message": "Invalid email format"}
|
||||||
|
|
||||||
|
if len(username) < 3:
|
||||||
|
return {"valid": False, "message": "Username must be at least 3 characters"}
|
||||||
|
|
||||||
|
password_validation = validate_password_strength(password)
|
||||||
|
if not password_validation["valid"]:
|
||||||
|
return password_validation
|
||||||
|
|
||||||
|
return {"valid": True, "message": "All inputs are valid"}
|
||||||
|
|
||||||
|
def check_existing_user(db: Session, email: str, username: str) -> Dict[str, Union[bool, str]]:
|
||||||
|
"""
|
||||||
|
Check if user with given email or username already exists.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: Database session
|
||||||
|
email: Email to check
|
||||||
|
username: Username to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing check result and message
|
||||||
|
"""
|
||||||
|
if db.query(User).filter(User.email == email).first():
|
||||||
|
return {"exists": True, "message": "Email already registered"}
|
||||||
|
|
||||||
|
if db.query(User).filter(User.username == username).first():
|
||||||
|
return {"exists": True, "message": "Username already taken"}
|
||||||
|
|
||||||
|
return {"exists": False, "message": "User can be created"}
|
||||||
|
|
||||||
|
def hash_password(password: str) -> str:
|
||||||
|
"""
|
||||||
|
Hash password using bcrypt.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
password: Plain text password
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Hashed password string
|
||||||
|
"""
|
||||||
|
return pwd_context.hash(password)
|
||||||
|
|
||||||
|
def create_new_user(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]:
|
||||||
|
"""
|
||||||
|
Create new user after validation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: Database session
|
||||||
|
user_data: User creation data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Created user object or error dict
|
||||||
|
"""
|
||||||
|
# Validate inputs
|
||||||
|
validation_result = validate_user_input(user_data.email, user_data.username, user_data.password)
|
||||||
|
if not validation_result["valid"]:
|
||||||
|
return {"error": validation_result["message"]}
|
||||||
|
|
||||||
|
# Check existing user
|
||||||
|
existing_check = check_existing_user(db, user_data.email, user_data.username)
|
||||||
|
if existing_check["exists"]:
|
||||||
|
return {"error": existing_check["message"]}
|
||||||
|
|
||||||
|
# Create user
|
||||||
|
hashed_password = hash_password(user_data.password)
|
||||||
|
db_user = User(
|
||||||
|
email=user_data.email,
|
||||||
|
username=user_data.username,
|
||||||
|
password=hashed_password,
|
||||||
|
first_name=user_data.first_name,
|
||||||
|
last_name=user_data.last_name,
|
||||||
|
is_active=True,
|
||||||
|
is_verified=False
|
||||||
|
)
|
||||||
|
|
||||||
|
db.add(db_user)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(db_user)
|
||||||
|
|
||||||
|
return db_user
|
Loading…
x
Reference in New Issue
Block a user