from typing import Optional, Dict, Union, List from datetime import datetime import re from pydantic import EmailStr from sqlalchemy.orm import Session from models.user import User from schemas.user import UserCreate, UserUpdate from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def validate_user_data(username: str, email: str, password: str) -> Dict[str, str]: """ Validate user registration data. Args: username: Username to validate email: Email to validate password: Password to validate Returns: Dict with error messages if validation fails, empty dict if validation passes """ errors = {} if len(username) < 3 or len(username) > 50: errors["username"] = "Username must be between 3 and 50 characters" email_pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' if not re.match(email_pattern, email): errors["email"] = "Invalid email format" if len(password) < 8: errors["password"] = "Password must be at least 8 characters long" return errors def hash_password(password: str) -> str: """ Hash a password using bcrypt. Args: password: Plain text password Returns: Hashed password """ return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify a password against its hash. Args: plain_password: Password to verify hashed_password: Stored hash to check against Returns: True if password matches, False otherwise """ return pwd_context.verify(plain_password, hashed_password) def update_last_login(db: Session, user: User) -> User: """ Update user's last login timestamp. Args: db: Database session user: User object to update Returns: Updated user object """ user.last_login = datetime.utcnow() db.commit() db.refresh(user) return user def format_user_response(user: User) -> Dict[str, Any]: """ Format user data for API response. Args: user: User object to format Returns: Dictionary with formatted user data """ return { "id": user.id, "username": user.username, "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "is_active": user.is_active, "is_verified": user.is_verified, "last_login": user.last_login.isoformat() if user.last_login else None } def search_users( db: Session, search_term: str, skip: int = 0, limit: int = 10 ) -> List[User]: """ Search users by username, first name, or last name. Args: db: Database session search_term: Term to search for skip: Number of records to skip limit: Maximum number of records to return Returns: List of matching user objects """ return db.query(User).filter( (User.username.ilike(f"%{search_term}%")) | (User.first_name.ilike(f"%{search_term}%")) | (User.last_name.ilike(f"%{search_term}%")) ).offset(skip).limit(limit).all() def toggle_user_status(db: Session, user: User, activate: bool) -> User: """ Toggle user active status. Args: db: Database session user: User object to update activate: True to activate, False to deactivate Returns: Updated user object """ user.is_active = activate db.commit() db.refresh(user) return user