from typing import Optional, Dict, Union import re from datetime import datetime from sqlalchemy.orm import Session from passlib.context import CryptContext from models.user import User from schemas.user import UserCreate from email_validator import validate_email, EmailNotValidError pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def validate_password_strength(password: str) -> Dict[str, Union[bool, str]]: """ Validate password meets minimum requirements. Args: password: Password string to validate Returns: Dict containing validation result and message """ if len(password) < 8: return {"valid": False, "message": "Password must be at least 8 characters"} if not any(char.isdigit() for char in password): return {"valid": False, "message": "Password must contain at least one number"} if not any(char.isupper() for char in password): return {"valid": False, "message": "Password must contain at least one uppercase letter"} return {"valid": True, "message": "Password is valid"} def validate_user_input(email: str, username: str, password: str) -> Dict[str, Union[bool, str]]: """ Validate user signup input fields. Args: email: Email to validate username: Username to validate password: Password to validate Returns: Dict containing validation result and message """ try: validate_email(email) except EmailNotValidError: return {"valid": False, "message": "Invalid email format"} if len(username) < 3: return {"valid": False, "message": "Username must be at least 3 characters"} password_validation = validate_password_strength(password) if not password_validation["valid"]: return password_validation return {"valid": True, "message": "All inputs are valid"} def check_existing_user(db: Session, email: str, username: str) -> Dict[str, Union[bool, str]]: """ Check if user with given email or username already exists. Args: db: Database session email: Email to check username: Username to check Returns: Dict containing check result and message """ if db.query(User).filter(User.email == email).first(): return {"exists": True, "message": "Email already registered"} if db.query(User).filter(User.username == username).first(): return {"exists": True, "message": "Username already taken"} return {"exists": False, "message": "User can be created"} def hash_password(password: str) -> str: """ Hash password using bcrypt. Args: password: Plain text password Returns: Hashed password string """ return pwd_context.hash(password) def create_new_user(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]: """ Create new user after validation. Args: db: Database session user_data: User creation data Returns: Created user object or error dict """ # Validate inputs validation_result = validate_user_input(user_data.email, user_data.username, user_data.password) if not validation_result["valid"]: return {"error": validation_result["message"]} # Check existing user existing_check = check_existing_user(db, user_data.email, user_data.username) if existing_check["exists"]: return {"error": existing_check["message"]} # Create user hashed_password = hash_password(user_data.password) db_user = User( email=user_data.email, username=user_data.username, password=hashed_password, first_name=user_data.first_name, last_name=user_data.last_name, is_active=True, is_verified=False ) db.add(db_user) db.commit() db.refresh(db_user) return db_user