bosch-3671ii/helpers/user_helpers.py
2025-03-28 05:57:22 -05:00

123 lines
3.3 KiB
Python

from typing import List, Dict, Optional, Union
import re
from datetime import datetime
from sqlalchemy.orm import Session
from models.user import User
from schemas.user import UserCreate, UserUpdate
def validate_email(email: str) -> bool:
"""
Validate an email address format.
Args:
email: The email address to validate
Returns:
bool: True if email format is valid, False otherwise
"""
pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
return bool(re.match(pattern, email))
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""
Get a user by their email address.
Args:
db: Database session
email: Email to search for
Returns:
User object if found, None otherwise
"""
return db.query(User).filter(User.email == email).first()
def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]:
"""
Create a new user with validation and error handling.
Args:
db: Database session
user_data: User data for creation
Returns:
User object if created successfully, error dict otherwise
"""
# Check if email is valid
if not validate_email(user_data.email):
return {"error": "Invalid email format"}
# Check if user already exists
existing_user = get_user_by_email(db, user_data.email)
if existing_user:
return {"error": "Email already registered"}
# Create the user
db_user = User(
username=user_data.username,
email=user_data.email,
password=user_data.password # In a real app, hash this password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user_id: int, user_data: UserUpdate) -> Union[User, Dict[str, str]]:
"""
Update an existing user's information.
Args:
db: Database session
user_id: ID of the user to update
user_data: Updated user data
Returns:
User object if updated successfully, error dict otherwise
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
return {"error": "User not found"}
# Update user fields
if user_data.username:
user.username = user_data.username
if user_data.email:
# Validate email format
if not validate_email(user_data.email):
return {"error": "Invalid email format"}
# Check for duplicate email
existing_user = get_user_by_email(db, user_data.email)
if existing_user and existing_user.id != user_id:
return {"error": "Email already registered"}
user.email = user_data.email
if user_data.password:
user.password = user_data.password # In a real app, hash this password
db.commit()
db.refresh(user)
return user
def get_user_age(user: User) -> int:
"""
Calculate the age of a user based on their date of birth.
Args:
user: User object
Returns:
int: Age of the user
"""
if not user.date_of_birth:
return 0
today = datetime.utcnow().date()
age = today.year - user.date_of_birth.year
if today.month < user.date_of_birth.month or (
today.month == user.date_of_birth.month and today.day < user.date_of_birth.day
):
age -= 1
return age