Add helper functions for User

This commit is contained in:
Backend IM Bot 2025-03-27 21:49:35 +00:00
parent 37864e81b5
commit 2cfc6696c9

View File

@ -2,133 +2,137 @@ from typing import List, Dict, Optional, Union, Any
from datetime import datetime from datetime import datetime
import re import re
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import or_
from models.user import User from models.user import User
from schemas.user import UserCreate, UserUpdate from schemas.user import UserCreate, UserUpdate
from passlib.context import CryptContext
def validate_phone_number(phone: Optional[str]) -> bool: pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def validate_username(username: str) -> bool:
""" """
Validate phone number format if provided. Validate username format and length.
Args: Args:
phone: Phone number to validate username: Username to validate
Returns: Returns:
bool: True if format is valid or phone is None, False otherwise bool: True if valid, False otherwise
""" """
if not phone: pattern = r'^[a-zA-Z0-9_]{3,32}$'
return True return bool(re.match(pattern, username))
pattern = r'^\+?1?\d{9,15}$'
return bool(re.match(pattern, phone))
def validate_email(email: str) -> bool: def hash_password(password: str) -> str:
""" """
Validate email address format. Hash a password using bcrypt.
Args: Args:
email: Email to validate password: Plain text password
Returns: Returns:
bool: True if format is valid, False otherwise str: Hashed password
""" """
pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' return pwd_context.hash(password)
return bool(re.match(pattern, email))
def format_user_data(user: User) -> Dict[str, Any]: def verify_password(plain_password: str, hashed_password: str) -> bool:
""" """
Format user data for API response. Verify a password against its hash.
Args:
plain_password: Password to verify
hashed_password: Stored hash to check against
Returns:
bool: True if password matches
"""
return pwd_context.verify(plain_password, hashed_password)
def search_users(
db: Session,
search_term: str,
skip: int = 0,
limit: int = 10
) -> List[User]:
"""
Search for users by username, first name, or last name.
Args:
db: Database session
search_term: Term to search for
skip: Number of records to skip
limit: Max number of records to return
Returns:
List of matching users
"""
return db.query(User).filter(
or_(
User.username.ilike(f"%{search_term}%"),
User.first_name.ilike(f"%{search_term}%"),
User.last_name.ilike(f"%{search_term}%")
)
).offset(skip).limit(limit).all()
def update_last_login(db: Session, user: User) -> User:
"""
Update user's last login timestamp.
Args:
db: Database session
user: User to update
Returns:
Updated user object
"""
user.last_login = datetime.utcnow()
db.commit()
db.refresh(user)
return user
def format_user_profile(user: User) -> Dict[str, Any]:
"""
Format user data for profile display.
Args: Args:
user: User object to format user: User object to format
Returns: Returns:
Dict containing formatted user data Dict containing formatted user profile data
""" """
return { return {
"name": user.name, "username": user.username,
"address": user.address, "full_name": f"{user.first_name or ''} {user.last_name or ''}".strip(),
"phone": user.phone if user.phone else None, "bio": user.bio or "No bio available",
"email": user.email, "profile_picture": user.profile_picture or "default.jpg",
"is_active": user.is_active, "relationship_status": user.relationship_status or "Not specified",
"created_at": user.created_at.isoformat() if hasattr(user, 'created_at') else None "interests": user.interests.split(",") if user.interests else [],
"last_active": user.last_login.isoformat() if user.last_login else None
} }
def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]: def validate_user_update(user_data: UserUpdate) -> Dict[str, str]:
""" """
Create new user with validation and error handling. Validate user update data.
Args: Args:
db: Database session user_data: Update data to validate
user_data: User creation data
Returns: Returns:
User object if created successfully, error dict otherwise Dict with error messages if validation fails
""" """
if not validate_email(user_data.email): errors = {}
return {"error": "Invalid email format"}
if not validate_phone_number(user_data.phone): if user_data.username and not validate_username(user_data.username):
return {"error": "Invalid phone number format"} errors["username"] = "Invalid username format"
existing_user = db.query(User).filter(User.email == user_data.email).first() if user_data.email and not validate_email(user_data.email):
if existing_user: errors["email"] = "Invalid email format"
return {"error": "Email already registered"}
db_user = User( if user_data.profile_picture and not user_data.profile_picture.endswith(('.jpg','.png','.jpeg')):
name=user_data.name, errors["profile_picture"] = "Invalid image format"
address=user_data.address,
phone=user_data.phone,
email=user_data.email,
is_active=True
)
db.add(db_user) if user_data.relationship_status and user_data.relationship_status not in [
db.commit() "Single", "In a relationship", "Married", "It's complicated"
db.refresh(db_user) ]:
errors["relationship_status"] = "Invalid relationship status"
return db_user return errors
def update_user_safely(
db: Session,
user: User,
user_data: UserUpdate
) -> Union[User, Dict[str, str]]:
"""
Update user data with validation.
Args:
db: Database session
user: Existing user to update
user_data: Update data
Returns:
Updated user object if successful, error dict otherwise
"""
if user_data.email and user_data.email != user.email:
if not validate_email(user_data.email):
return {"error": "Invalid email format"}
if db.query(User).filter(User.email == user_data.email).first():
return {"error": "Email already exists"}
if user_data.phone and not validate_phone_number(user_data.phone):
return {"error": "Invalid phone number format"}
for field, value in user_data.dict(exclude_unset=True).items():
setattr(user, field, value)
db.commit()
db.refresh(user)
return user
def get_active_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
"""
Get paginated list of active users.
Args:
db: Database session
skip: Number of records to skip
limit: Max number of records to return
Returns:
List of active user objects
"""
return db.query(User).filter(User.is_active == True).offset(skip).limit(limit).all()