From e22dfc08bc366871b46ab0185298aa3011c69092 Mon Sep 17 00:00:00 2001 From: Backend IM Bot Date: Thu, 27 Mar 2025 13:05:56 +0000 Subject: [PATCH] Add helper functions for User --- helpers/user_helpers.py | 139 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 helpers/user_helpers.py diff --git a/helpers/user_helpers.py b/helpers/user_helpers.py new file mode 100644 index 0000000..41a65ef --- /dev/null +++ b/helpers/user_helpers.py @@ -0,0 +1,139 @@ +from typing import Optional, Dict, Union, List +from datetime import datetime +import re +from pydantic import EmailStr +from sqlalchemy.orm import Session +from models.user import User +from schemas.user import UserCreate, UserUpdate +from passlib.context import CryptContext + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +def validate_user_data(username: str, email: str, password: str) -> Dict[str, str]: + """ + Validate user registration data. + + Args: + username: Username to validate + email: Email to validate + password: Password to validate + + Returns: + Dict with error messages if validation fails, empty dict if validation passes + """ + errors = {} + + if len(username) < 3 or len(username) > 50: + errors["username"] = "Username must be between 3 and 50 characters" + + email_pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' + if not re.match(email_pattern, email): + errors["email"] = "Invalid email format" + + if len(password) < 8: + errors["password"] = "Password must be at least 8 characters long" + + return errors + +def hash_password(password: str) -> str: + """ + Hash a password using bcrypt. + + Args: + password: Plain text password + + Returns: + Hashed password + """ + return pwd_context.hash(password) + +def verify_password(plain_password: str, hashed_password: str) -> bool: + """ + Verify a password against its hash. + + Args: + plain_password: Password to verify + hashed_password: Stored hash to check against + + Returns: + True if password matches, False otherwise + """ + return pwd_context.verify(plain_password, hashed_password) + +def update_last_login(db: Session, user: User) -> User: + """ + Update user's last login timestamp. + + Args: + db: Database session + user: User object to update + + Returns: + Updated user object + """ + user.last_login = datetime.utcnow() + db.commit() + db.refresh(user) + return user + +def format_user_response(user: User) -> Dict[str, Any]: + """ + Format user data for API response. + + Args: + user: User object to format + + Returns: + Dictionary with formatted user data + """ + return { + "id": user.id, + "username": user.username, + "email": user.email, + "first_name": user.first_name, + "last_name": user.last_name, + "is_active": user.is_active, + "is_verified": user.is_verified, + "last_login": user.last_login.isoformat() if user.last_login else None + } + +def search_users( + db: Session, + search_term: str, + skip: int = 0, + limit: int = 10 +) -> List[User]: + """ + Search users by username, first name, or last name. + + Args: + db: Database session + search_term: Term to search for + skip: Number of records to skip + limit: Maximum number of records to return + + Returns: + List of matching user objects + """ + return db.query(User).filter( + (User.username.ilike(f"%{search_term}%")) | + (User.first_name.ilike(f"%{search_term}%")) | + (User.last_name.ilike(f"%{search_term}%")) + ).offset(skip).limit(limit).all() + +def toggle_user_status(db: Session, user: User, activate: bool) -> User: + """ + Toggle user active status. + + Args: + db: Database session + user: User object to update + activate: True to activate, False to deactivate + + Returns: + Updated user object + """ + user.is_active = activate + db.commit() + db.refresh(user) + return user \ No newline at end of file