Add helper functions for User

This commit is contained in:
Backend IM Bot 2025-03-28 04:36:08 +00:00
parent 95ef3923e2
commit edb4d0397f

View File

@ -1,14 +1,14 @@
from typing import List, Dict, Optional, Union
from typing import List, Dict, Optional, Union, Any
from datetime import datetime
import re
from sqlalchemy.orm import Session
from fastapi import UploadFile
import os
from models.user import User, UserRoles
from passlib.context import CryptContext
from models.user import User
from schemas.user import UserCreate, UserUpdate
from PIL import Image
def validate_user_data(user_data: Union[UserCreate, UserUpdate]) -> Dict[str, str]:
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def validate_user_data(user_data: UserCreate) -> Dict[str, str]:
"""
Validate user input data.
@ -20,116 +20,156 @@ def validate_user_data(user_data: Union[UserCreate, UserUpdate]) -> Dict[str, st
"""
errors = {}
# Email validation
if hasattr(user_data, 'email'):
email_pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
if not re.match(email_pattern, user_data.email):
errors['email'] = "Invalid email format"
if not user_data.first_name or len(user_data.first_name) < 2:
errors["first_name"] = "First name must be at least 2 characters"
# Phone validation
if hasattr(user_data, 'phone_number'):
phone_pattern = r'^\+?1?\d{9,15}$'
if not re.match(phone_pattern, user_data.phone_number):
errors['phone'] = "Invalid phone number format"
if not user_data.last_name or len(user_data.last_name) < 2:
errors["last_name"] = "Last name must be at least 2 characters"
# Age validation
if hasattr(user_data, 'date_of_birth'):
age = (datetime.now() - user_data.date_of_birth).days / 365
if age < 5: # Minimum age for school
errors['age'] = "User must be at least 5 years old"
if not validate_email(user_data.email):
errors["email"] = "Invalid email format"
if not validate_phone(user_data.phone):
errors["phone"] = "Invalid phone number format"
if not user_data.password or len(user_data.password) < 8:
errors["password"] = "Password must be at least 8 characters"
return errors
async def save_profile_picture(user_id: int, file: UploadFile) -> Optional[str]:
def validate_email(email: str) -> bool:
"""
Save and process user profile picture.
Validate email format.
Args:
user_id: ID of the user
file: Uploaded image file
email: Email to validate
Returns:
Path to saved image or None if failed
bool: True if valid, False otherwise
"""
try:
# Create directory if not exists
upload_dir = f"uploads/profile_pictures/{user_id}"
os.makedirs(upload_dir, exist_ok=True)
pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
return bool(re.match(pattern, email))
# Save file path
file_path = f"{upload_dir}/{file.filename}"
# Save uploaded file
with open(file_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
# Process image - resize to standard size
with Image.open(file_path) as img:
img = img.resize((200, 200))
img.save(file_path)
return file_path
except Exception:
return None
def get_users_by_role(db: Session, role: UserRoles, active_only: bool = True) -> List[User]:
def validate_phone(phone: str) -> bool:
"""
Get all users with specific role.
Validate phone number format.
Args:
phone: Phone number to validate
Returns:
bool: True if valid, False otherwise
"""
pattern = r'^\+?1?\d{9,15}$'
return bool(re.match(pattern, phone))
def hash_password(password: str) -> str:
"""
Hash a password using bcrypt.
Args:
password: Plain text password
Returns:
str: Hashed password
"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a password against its hash.
Args:
plain_password: Password to verify
hashed_password: Stored hash to check against
Returns:
bool: True if password matches, False otherwise
"""
return pwd_context.verify(plain_password, hashed_password)
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""
Get user by email.
Args:
db: Database session
role: Role to filter by
active_only: Whether to return only active users
email: Email to search for
Returns:
List of users with specified role
User if found, None otherwise
"""
query = db.query(User).filter(User.role == role)
return db.query(User).filter(User.email == email).first()
if active_only:
query = query.filter(User.is_active == True)
return query.all()
def update_last_login(db: Session, user_id: int) -> bool:
def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]:
"""
Update user's last login timestamp.
Create new user with validation.
Args:
db: Database session
user_data: User data
Returns:
Created user or error dict
"""
validation_errors = validate_user_data(user_data)
if validation_errors:
return {"errors": validation_errors}
existing_user = get_user_by_email(db, user_data.email)
if existing_user:
return {"error": "Email already registered"}
hashed_password = hash_password(user_data.password)
db_user = User(
first_name=user_data.first_name,
last_name=user_data.last_name,
email=user_data.email,
phone=user_data.phone,
password=hashed_password,
address=user_data.address,
city=user_data.city,
country=user_data.country
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user_safely(db: Session, user_id: int, user_data: UserUpdate) -> Union[User, Dict[str, str]]:
"""
Update user data with validation.
Args:
db: Database session
user_id: ID of user to update
user_data: New user data
Returns:
True if update successful, False otherwise
Updated user or error dict
"""
try:
user = db.query(User).filter(User.id == user_id).first()
if user:
user.last_login = datetime.now()
db.commit()
return True
except Exception:
db.rollback()
return False
return False
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
return {"error": "User not found"}
def format_user_display_name(user: User) -> str:
"""
Format user's display name.
if user_data.email and user_data.email != db_user.email:
if not validate_email(user_data.email):
return {"error": "Invalid email format"}
existing_user = get_user_by_email(db, user_data.email)
if existing_user:
return {"error": "Email already registered"}
Args:
user: User object
if user_data.phone and not validate_phone(user_data.phone):
return {"error": "Invalid phone number format"}
Returns:
Formatted display name
"""
if user.first_name and user.last_name:
return f"{user.first_name} {user.last_name}"
elif user.first_name:
return user.first_name
elif user.last_name:
return user.last_name
else:
return user.email.split('@')[0]
for field, value in user_data.dict(exclude_unset=True).items():
if field == "password":
value = hash_password(value)
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user