diff --git a/helpers/user_helpers.py b/helpers/user_helpers.py index 54c7021..f0f2ff2 100644 --- a/helpers/user_helpers.py +++ b/helpers/user_helpers.py @@ -1,14 +1,14 @@ -from typing import List, Dict, Optional, Union +from typing import List, Dict, Optional, Union, Any from datetime import datetime import re from sqlalchemy.orm import Session -from fastapi import UploadFile -import os -from models.user import User, UserRoles +from passlib.context import CryptContext +from models.user import User from schemas.user import UserCreate, UserUpdate -from PIL import Image -def validate_user_data(user_data: Union[UserCreate, UserUpdate]) -> Dict[str, str]: +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +def validate_user_data(user_data: UserCreate) -> Dict[str, str]: """ Validate user input data. @@ -20,116 +20,156 @@ def validate_user_data(user_data: Union[UserCreate, UserUpdate]) -> Dict[str, st """ errors = {} - # Email validation - if hasattr(user_data, 'email'): - email_pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' - if not re.match(email_pattern, user_data.email): - errors['email'] = "Invalid email format" - - # Phone validation - if hasattr(user_data, 'phone_number'): - phone_pattern = r'^\+?1?\d{9,15}$' - if not re.match(phone_pattern, user_data.phone_number): - errors['phone'] = "Invalid phone number format" - - # Age validation - if hasattr(user_data, 'date_of_birth'): - age = (datetime.now() - user_data.date_of_birth).days / 365 - if age < 5: # Minimum age for school - errors['age'] = "User must be at least 5 years old" - + if not user_data.first_name or len(user_data.first_name) < 2: + errors["first_name"] = "First name must be at least 2 characters" + + if not user_data.last_name or len(user_data.last_name) < 2: + errors["last_name"] = "Last name must be at least 2 characters" + + if not validate_email(user_data.email): + errors["email"] = "Invalid email format" + + if not validate_phone(user_data.phone): + errors["phone"] = "Invalid phone number format" + + if not user_data.password or len(user_data.password) < 8: + errors["password"] = "Password must be at least 8 characters" + return errors -async def save_profile_picture(user_id: int, file: UploadFile) -> Optional[str]: +def validate_email(email: str) -> bool: """ - Save and process user profile picture. + Validate email format. Args: - user_id: ID of the user - file: Uploaded image file + email: Email to validate Returns: - Path to saved image or None if failed + bool: True if valid, False otherwise """ - try: - # Create directory if not exists - upload_dir = f"uploads/profile_pictures/{user_id}" - os.makedirs(upload_dir, exist_ok=True) - - # Save file path - file_path = f"{upload_dir}/{file.filename}" - - # Save uploaded file - with open(file_path, "wb") as buffer: - content = await file.read() - buffer.write(content) - - # Process image - resize to standard size - with Image.open(file_path) as img: - img = img.resize((200, 200)) - img.save(file_path) - - return file_path - - except Exception: - return None + pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' + return bool(re.match(pattern, email)) -def get_users_by_role(db: Session, role: UserRoles, active_only: bool = True) -> List[User]: +def validate_phone(phone: str) -> bool: """ - Get all users with specific role. + Validate phone number format. + + Args: + phone: Phone number to validate + + Returns: + bool: True if valid, False otherwise + """ + pattern = r'^\+?1?\d{9,15}$' + return bool(re.match(pattern, phone)) + +def hash_password(password: str) -> str: + """ + Hash a password using bcrypt. + + Args: + password: Plain text password + + Returns: + str: Hashed password + """ + return pwd_context.hash(password) + +def verify_password(plain_password: str, hashed_password: str) -> bool: + """ + Verify a password against its hash. + + Args: + plain_password: Password to verify + hashed_password: Stored hash to check against + + Returns: + bool: True if password matches, False otherwise + """ + return pwd_context.verify(plain_password, hashed_password) + +def get_user_by_email(db: Session, email: str) -> Optional[User]: + """ + Get user by email. Args: db: Database session - role: Role to filter by - active_only: Whether to return only active users + email: Email to search for Returns: - List of users with specified role + User if found, None otherwise """ - query = db.query(User).filter(User.role == role) - - if active_only: - query = query.filter(User.is_active == True) - - return query.all() + return db.query(User).filter(User.email == email).first() -def update_last_login(db: Session, user_id: int) -> bool: +def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]: """ - Update user's last login timestamp. + Create new user with validation. + + Args: + db: Database session + user_data: User data + + Returns: + Created user or error dict + """ + validation_errors = validate_user_data(user_data) + if validation_errors: + return {"errors": validation_errors} + + existing_user = get_user_by_email(db, user_data.email) + if existing_user: + return {"error": "Email already registered"} + + hashed_password = hash_password(user_data.password) + + db_user = User( + first_name=user_data.first_name, + last_name=user_data.last_name, + email=user_data.email, + phone=user_data.phone, + password=hashed_password, + address=user_data.address, + city=user_data.city, + country=user_data.country + ) + + db.add(db_user) + db.commit() + db.refresh(db_user) + + return db_user + +def update_user_safely(db: Session, user_id: int, user_data: UserUpdate) -> Union[User, Dict[str, str]]: + """ + Update user data with validation. Args: db: Database session user_id: ID of user to update + user_data: New user data Returns: - True if update successful, False otherwise + Updated user or error dict """ - try: - user = db.query(User).filter(User.id == user_id).first() - if user: - user.last_login = datetime.now() - db.commit() - return True - except Exception: - db.rollback() - return False - return False + db_user = db.query(User).filter(User.id == user_id).first() + if not db_user: + return {"error": "User not found"} -def format_user_display_name(user: User) -> str: - """ - Format user's display name. - - Args: - user: User object - - Returns: - Formatted display name - """ - if user.first_name and user.last_name: - return f"{user.first_name} {user.last_name}" - elif user.first_name: - return user.first_name - elif user.last_name: - return user.last_name - else: - return user.email.split('@')[0] \ No newline at end of file + if user_data.email and user_data.email != db_user.email: + if not validate_email(user_data.email): + return {"error": "Invalid email format"} + existing_user = get_user_by_email(db, user_data.email) + if existing_user: + return {"error": "Email already registered"} + + if user_data.phone and not validate_phone(user_data.phone): + return {"error": "Invalid phone number format"} + + for field, value in user_data.dict(exclude_unset=True).items(): + if field == "password": + value = hash_password(value) + setattr(db_user, field, value) + + db.commit() + db.refresh(db_user) + return db_user \ No newline at end of file