Add helper functions for User
This commit is contained in:
parent
bc7e94353a
commit
3b8dcd8794
135
helpers/user_helpers.py
Normal file
135
helpers/user_helpers.py
Normal file
@ -0,0 +1,135 @@
|
||||
from typing import List, Dict, Optional, Union
|
||||
from datetime import datetime
|
||||
import re
|
||||
from sqlalchemy.orm import Session
|
||||
from fastapi import UploadFile
|
||||
import os
|
||||
from models.user import User, UserRoles
|
||||
from schemas.user import UserCreate, UserUpdate
|
||||
from PIL import Image
|
||||
|
||||
def validate_user_data(user_data: Union[UserCreate, UserUpdate]) -> Dict[str, str]:
|
||||
"""
|
||||
Validate user input data.
|
||||
|
||||
Args:
|
||||
user_data: User data to validate
|
||||
|
||||
Returns:
|
||||
Dict with error messages if validation fails, empty dict if valid
|
||||
"""
|
||||
errors = {}
|
||||
|
||||
# Email validation
|
||||
if hasattr(user_data, 'email'):
|
||||
email_pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
|
||||
if not re.match(email_pattern, user_data.email):
|
||||
errors['email'] = "Invalid email format"
|
||||
|
||||
# Phone validation
|
||||
if hasattr(user_data, 'phone_number'):
|
||||
phone_pattern = r'^\+?1?\d{9,15}$'
|
||||
if not re.match(phone_pattern, user_data.phone_number):
|
||||
errors['phone'] = "Invalid phone number format"
|
||||
|
||||
# Age validation
|
||||
if hasattr(user_data, 'date_of_birth'):
|
||||
age = (datetime.now() - user_data.date_of_birth).days / 365
|
||||
if age < 5: # Minimum age for school
|
||||
errors['age'] = "User must be at least 5 years old"
|
||||
|
||||
return errors
|
||||
|
||||
async def save_profile_picture(user_id: int, file: UploadFile) -> Optional[str]:
|
||||
"""
|
||||
Save and process user profile picture.
|
||||
|
||||
Args:
|
||||
user_id: ID of the user
|
||||
file: Uploaded image file
|
||||
|
||||
Returns:
|
||||
Path to saved image or None if failed
|
||||
"""
|
||||
try:
|
||||
# Create directory if not exists
|
||||
upload_dir = f"uploads/profile_pictures/{user_id}"
|
||||
os.makedirs(upload_dir, exist_ok=True)
|
||||
|
||||
# Save file path
|
||||
file_path = f"{upload_dir}/{file.filename}"
|
||||
|
||||
# Save uploaded file
|
||||
with open(file_path, "wb") as buffer:
|
||||
content = await file.read()
|
||||
buffer.write(content)
|
||||
|
||||
# Process image - resize to standard size
|
||||
with Image.open(file_path) as img:
|
||||
img = img.resize((200, 200))
|
||||
img.save(file_path)
|
||||
|
||||
return file_path
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def get_users_by_role(db: Session, role: UserRoles, active_only: bool = True) -> List[User]:
|
||||
"""
|
||||
Get all users with specific role.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
role: Role to filter by
|
||||
active_only: Whether to return only active users
|
||||
|
||||
Returns:
|
||||
List of users with specified role
|
||||
"""
|
||||
query = db.query(User).filter(User.role == role)
|
||||
|
||||
if active_only:
|
||||
query = query.filter(User.is_active == True)
|
||||
|
||||
return query.all()
|
||||
|
||||
def update_last_login(db: Session, user_id: int) -> bool:
|
||||
"""
|
||||
Update user's last login timestamp.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
user_id: ID of user to update
|
||||
|
||||
Returns:
|
||||
True if update successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
user = db.query(User).filter(User.id == user_id).first()
|
||||
if user:
|
||||
user.last_login = datetime.now()
|
||||
db.commit()
|
||||
return True
|
||||
except Exception:
|
||||
db.rollback()
|
||||
return False
|
||||
return False
|
||||
|
||||
def format_user_display_name(user: User) -> str:
|
||||
"""
|
||||
Format user's display name.
|
||||
|
||||
Args:
|
||||
user: User object
|
||||
|
||||
Returns:
|
||||
Formatted display name
|
||||
"""
|
||||
if user.first_name and user.last_name:
|
||||
return f"{user.first_name} {user.last_name}"
|
||||
elif user.first_name:
|
||||
return user.first_name
|
||||
elif user.last_name:
|
||||
return user.last_name
|
||||
else:
|
||||
return user.email.split('@')[0]
|
Loading…
x
Reference in New Issue
Block a user