134 lines
3.7 KiB
Python
134 lines
3.7 KiB
Python
from typing import List, Dict, Optional, Union, Any
|
|
from datetime import datetime
|
|
import re
|
|
from sqlalchemy.orm import Session
|
|
from models.user import User
|
|
from schemas.user import UserCreate, UserUpdate
|
|
|
|
def validate_phone_number(phone: Optional[str]) -> bool:
|
|
"""
|
|
Validate phone number format if provided.
|
|
|
|
Args:
|
|
phone: Phone number to validate
|
|
|
|
Returns:
|
|
bool: True if format is valid or phone is None, False otherwise
|
|
"""
|
|
if not phone:
|
|
return True
|
|
pattern = r'^\+?1?\d{9,15}$'
|
|
return bool(re.match(pattern, phone))
|
|
|
|
def validate_email(email: str) -> bool:
|
|
"""
|
|
Validate email address format.
|
|
|
|
Args:
|
|
email: Email to validate
|
|
|
|
Returns:
|
|
bool: True if format is valid, False otherwise
|
|
"""
|
|
pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
|
|
return bool(re.match(pattern, email))
|
|
|
|
def format_user_data(user: User) -> Dict[str, Any]:
|
|
"""
|
|
Format user data for API response.
|
|
|
|
Args:
|
|
user: User object to format
|
|
|
|
Returns:
|
|
Dict containing formatted user data
|
|
"""
|
|
return {
|
|
"name": user.name,
|
|
"address": user.address,
|
|
"phone": user.phone if user.phone else None,
|
|
"email": user.email,
|
|
"is_active": user.is_active,
|
|
"created_at": user.created_at.isoformat() if hasattr(user, 'created_at') else None
|
|
}
|
|
|
|
def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]:
|
|
"""
|
|
Create new user with validation and error handling.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_data: User creation data
|
|
|
|
Returns:
|
|
User object if created successfully, error dict otherwise
|
|
"""
|
|
if not validate_email(user_data.email):
|
|
return {"error": "Invalid email format"}
|
|
|
|
if not validate_phone_number(user_data.phone):
|
|
return {"error": "Invalid phone number format"}
|
|
|
|
existing_user = db.query(User).filter(User.email == user_data.email).first()
|
|
if existing_user:
|
|
return {"error": "Email already registered"}
|
|
|
|
db_user = User(
|
|
name=user_data.name,
|
|
address=user_data.address,
|
|
phone=user_data.phone,
|
|
email=user_data.email,
|
|
is_active=True
|
|
)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
return db_user
|
|
|
|
def update_user_safely(
|
|
db: Session,
|
|
user: User,
|
|
user_data: UserUpdate
|
|
) -> Union[User, Dict[str, str]]:
|
|
"""
|
|
Update user data with validation.
|
|
|
|
Args:
|
|
db: Database session
|
|
user: Existing user to update
|
|
user_data: Update data
|
|
|
|
Returns:
|
|
Updated user object if successful, error dict otherwise
|
|
"""
|
|
if user_data.email and user_data.email != user.email:
|
|
if not validate_email(user_data.email):
|
|
return {"error": "Invalid email format"}
|
|
if db.query(User).filter(User.email == user_data.email).first():
|
|
return {"error": "Email already exists"}
|
|
|
|
if user_data.phone and not validate_phone_number(user_data.phone):
|
|
return {"error": "Invalid phone number format"}
|
|
|
|
for field, value in user_data.dict(exclude_unset=True).items():
|
|
setattr(user, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
def get_active_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
|
|
"""
|
|
Get paginated list of active users.
|
|
|
|
Args:
|
|
db: Database session
|
|
skip: Number of records to skip
|
|
limit: Max number of records to return
|
|
|
|
Returns:
|
|
List of active user objects
|
|
"""
|
|
return db.query(User).filter(User.is_active == True).offset(skip).limit(limit).all() |