diff --git a/helpers/user_helpers.py b/helpers/user_helpers.py new file mode 100644 index 0000000..257c6b9 --- /dev/null +++ b/helpers/user_helpers.py @@ -0,0 +1,128 @@ +from typing import Optional, Dict, Union +import re +from datetime import datetime +from sqlalchemy.orm import Session +from passlib.context import CryptContext +from models.user import User +from schemas.user import UserCreate +from email_validator import validate_email, EmailNotValidError + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +def validate_password_strength(password: str) -> Dict[str, Union[bool, str]]: + """ + Validate password meets minimum requirements. + + Args: + password: Password string to validate + + Returns: + Dict containing validation result and message + """ + if len(password) < 8: + return {"valid": False, "message": "Password must be at least 8 characters"} + + if not any(char.isdigit() for char in password): + return {"valid": False, "message": "Password must contain at least one number"} + + if not any(char.isupper() for char in password): + return {"valid": False, "message": "Password must contain at least one uppercase letter"} + + return {"valid": True, "message": "Password is valid"} + +def validate_user_input(email: str, username: str, password: str) -> Dict[str, Union[bool, str]]: + """ + Validate user signup input fields. + + Args: + email: Email to validate + username: Username to validate + password: Password to validate + + Returns: + Dict containing validation result and message + """ + try: + validate_email(email) + except EmailNotValidError: + return {"valid": False, "message": "Invalid email format"} + + if len(username) < 3: + return {"valid": False, "message": "Username must be at least 3 characters"} + + password_validation = validate_password_strength(password) + if not password_validation["valid"]: + return password_validation + + return {"valid": True, "message": "All inputs are valid"} + +def check_existing_user(db: Session, email: str, username: str) -> Dict[str, Union[bool, str]]: + """ + Check if user with given email or username already exists. + + Args: + db: Database session + email: Email to check + username: Username to check + + Returns: + Dict containing check result and message + """ + if db.query(User).filter(User.email == email).first(): + return {"exists": True, "message": "Email already registered"} + + if db.query(User).filter(User.username == username).first(): + return {"exists": True, "message": "Username already taken"} + + return {"exists": False, "message": "User can be created"} + +def hash_password(password: str) -> str: + """ + Hash password using bcrypt. + + Args: + password: Plain text password + + Returns: + Hashed password string + """ + return pwd_context.hash(password) + +def create_new_user(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]: + """ + Create new user after validation. + + Args: + db: Database session + user_data: User creation data + + Returns: + Created user object or error dict + """ + # Validate inputs + validation_result = validate_user_input(user_data.email, user_data.username, user_data.password) + if not validation_result["valid"]: + return {"error": validation_result["message"]} + + # Check existing user + existing_check = check_existing_user(db, user_data.email, user_data.username) + if existing_check["exists"]: + return {"error": existing_check["message"]} + + # Create user + hashed_password = hash_password(user_data.password) + db_user = User( + email=user_data.email, + username=user_data.username, + password=hashed_password, + first_name=user_data.first_name, + last_name=user_data.last_name, + is_active=True, + is_verified=False + ) + + db.add(db_user) + db.commit() + db.refresh(db_user) + + return db_user \ No newline at end of file