123 lines
3.3 KiB
Python
123 lines
3.3 KiB
Python
from typing import List, Dict, Optional, Union
|
|
import re
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import Session
|
|
from models.user import User
|
|
from schemas.user import UserCreate, UserUpdate
|
|
|
|
def validate_email(email: str) -> bool:
|
|
"""
|
|
Validate an email address format.
|
|
|
|
Args:
|
|
email: The email address to validate
|
|
|
|
Returns:
|
|
bool: True if email format is valid, False otherwise
|
|
"""
|
|
pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
|
|
return bool(re.match(pattern, email))
|
|
|
|
def get_user_by_email(db: Session, email: str) -> Optional[User]:
|
|
"""
|
|
Get a user by their email address.
|
|
|
|
Args:
|
|
db: Database session
|
|
email: Email to search for
|
|
|
|
Returns:
|
|
User object if found, None otherwise
|
|
"""
|
|
return db.query(User).filter(User.email == email).first()
|
|
|
|
def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]:
|
|
"""
|
|
Create a new user with validation and error handling.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_data: User data for creation
|
|
|
|
Returns:
|
|
User object if created successfully, error dict otherwise
|
|
"""
|
|
# Check if email is valid
|
|
if not validate_email(user_data.email):
|
|
return {"error": "Invalid email format"}
|
|
|
|
# Check if user already exists
|
|
existing_user = get_user_by_email(db, user_data.email)
|
|
if existing_user:
|
|
return {"error": "Email already registered"}
|
|
|
|
# Create the user
|
|
db_user = User(
|
|
username=user_data.username,
|
|
email=user_data.email,
|
|
password=user_data.password # In a real app, hash this password
|
|
)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
return db_user
|
|
|
|
def update_user(db: Session, user_id: int, user_data: UserUpdate) -> Union[User, Dict[str, str]]:
|
|
"""
|
|
Update an existing user's information.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_id: ID of the user to update
|
|
user_data: Updated user data
|
|
|
|
Returns:
|
|
User object if updated successfully, error dict otherwise
|
|
"""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
return {"error": "User not found"}
|
|
|
|
# Update user fields
|
|
if user_data.username:
|
|
user.username = user_data.username
|
|
if user_data.email:
|
|
# Validate email format
|
|
if not validate_email(user_data.email):
|
|
return {"error": "Invalid email format"}
|
|
# Check for duplicate email
|
|
existing_user = get_user_by_email(db, user_data.email)
|
|
if existing_user and existing_user.id != user_id:
|
|
return {"error": "Email already registered"}
|
|
user.email = user_data.email
|
|
if user_data.password:
|
|
user.password = user_data.password # In a real app, hash this password
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return user
|
|
|
|
def get_user_age(user: User) -> int:
|
|
"""
|
|
Calculate the age of a user based on their date of birth.
|
|
|
|
Args:
|
|
user: User object
|
|
|
|
Returns:
|
|
int: Age of the user
|
|
"""
|
|
if not user.date_of_birth:
|
|
return 0
|
|
|
|
today = datetime.utcnow().date()
|
|
age = today.year - user.date_of_birth.year
|
|
if today.month < user.date_of_birth.month or (
|
|
today.month == user.date_of_birth.month and today.day < user.date_of_birth.day
|
|
):
|
|
age -= 1
|
|
|
|
return age |