175 lines
4.8 KiB
Python
175 lines
4.8 KiB
Python
from typing import List, Dict, Optional, Union, Any
|
|
from datetime import datetime
|
|
import re
|
|
from sqlalchemy.orm import Session
|
|
from passlib.context import CryptContext
|
|
from models.user import User
|
|
from schemas.user import UserCreate, UserUpdate
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
def validate_user_data(user_data: UserCreate) -> Dict[str, str]:
|
|
"""
|
|
Validate user input data.
|
|
|
|
Args:
|
|
user_data: User data to validate
|
|
|
|
Returns:
|
|
Dict with error messages if validation fails, empty dict if valid
|
|
"""
|
|
errors = {}
|
|
|
|
if not user_data.first_name or len(user_data.first_name) < 2:
|
|
errors["first_name"] = "First name must be at least 2 characters"
|
|
|
|
if not user_data.last_name or len(user_data.last_name) < 2:
|
|
errors["last_name"] = "Last name must be at least 2 characters"
|
|
|
|
if not validate_email(user_data.email):
|
|
errors["email"] = "Invalid email format"
|
|
|
|
if not validate_phone(user_data.phone):
|
|
errors["phone"] = "Invalid phone number format"
|
|
|
|
if not user_data.password or len(user_data.password) < 8:
|
|
errors["password"] = "Password must be at least 8 characters"
|
|
|
|
return errors
|
|
|
|
def validate_email(email: str) -> bool:
|
|
"""
|
|
Validate email format.
|
|
|
|
Args:
|
|
email: Email to validate
|
|
|
|
Returns:
|
|
bool: True if valid, False otherwise
|
|
"""
|
|
pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
|
|
return bool(re.match(pattern, email))
|
|
|
|
def validate_phone(phone: str) -> bool:
|
|
"""
|
|
Validate phone number format.
|
|
|
|
Args:
|
|
phone: Phone number to validate
|
|
|
|
Returns:
|
|
bool: True if valid, False otherwise
|
|
"""
|
|
pattern = r'^\+?1?\d{9,15}$'
|
|
return bool(re.match(pattern, phone))
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""
|
|
Hash a password using bcrypt.
|
|
|
|
Args:
|
|
password: Plain text password
|
|
|
|
Returns:
|
|
str: Hashed password
|
|
"""
|
|
return pwd_context.hash(password)
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""
|
|
Verify a password against its hash.
|
|
|
|
Args:
|
|
plain_password: Password to verify
|
|
hashed_password: Stored hash to check against
|
|
|
|
Returns:
|
|
bool: True if password matches, False otherwise
|
|
"""
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
def get_user_by_email(db: Session, email: str) -> Optional[User]:
|
|
"""
|
|
Get user by email.
|
|
|
|
Args:
|
|
db: Database session
|
|
email: Email to search for
|
|
|
|
Returns:
|
|
User if found, None otherwise
|
|
"""
|
|
return db.query(User).filter(User.email == email).first()
|
|
|
|
def create_user_safely(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]:
|
|
"""
|
|
Create new user with validation.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_data: User data
|
|
|
|
Returns:
|
|
Created user or error dict
|
|
"""
|
|
validation_errors = validate_user_data(user_data)
|
|
if validation_errors:
|
|
return {"errors": validation_errors}
|
|
|
|
existing_user = get_user_by_email(db, user_data.email)
|
|
if existing_user:
|
|
return {"error": "Email already registered"}
|
|
|
|
hashed_password = hash_password(user_data.password)
|
|
|
|
db_user = User(
|
|
first_name=user_data.first_name,
|
|
last_name=user_data.last_name,
|
|
email=user_data.email,
|
|
phone=user_data.phone,
|
|
password=hashed_password,
|
|
address=user_data.address,
|
|
city=user_data.city,
|
|
country=user_data.country
|
|
)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
return db_user
|
|
|
|
def update_user_safely(db: Session, user_id: int, user_data: UserUpdate) -> Union[User, Dict[str, str]]:
|
|
"""
|
|
Update user data with validation.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_id: ID of user to update
|
|
user_data: New user data
|
|
|
|
Returns:
|
|
Updated user or error dict
|
|
"""
|
|
db_user = db.query(User).filter(User.id == user_id).first()
|
|
if not db_user:
|
|
return {"error": "User not found"}
|
|
|
|
if user_data.email and user_data.email != db_user.email:
|
|
if not validate_email(user_data.email):
|
|
return {"error": "Invalid email format"}
|
|
existing_user = get_user_by_email(db, user_data.email)
|
|
if existing_user:
|
|
return {"error": "Email already registered"}
|
|
|
|
if user_data.phone and not validate_phone(user_data.phone):
|
|
return {"error": "Invalid phone number format"}
|
|
|
|
for field, value in user_data.dict(exclude_unset=True).items():
|
|
if field == "password":
|
|
value = hash_password(value)
|
|
setattr(db_user, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user |