from typing import Optional, Dict, Union, List from datetime import datetime import re from sqlalchemy.orm import Session from passlib.context import CryptContext from models.user import User from schemas.user import UserCreate, UserUpdate pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def validate_phone_number(phone: str) -> bool: """ Validate phone number format. Args: phone: Phone number to validate Returns: bool: True if valid format, False otherwise """ pattern = r'^\+?1?\d{9,15}$' return bool(re.match(pattern, phone)) def validate_bar_number(bar_number: str) -> bool: """ Validate lawyer bar number format. Args: bar_number: Bar number to validate Returns: bool: True if valid format, False otherwise """ pattern = r'^[A-Z0-9]{5,12}$' return bool(re.match(pattern, bar_number)) def hash_password(password: str) -> str: """ Hash a password using bcrypt. Args: password: Plain text password Returns: str: Hashed password """ return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify a password against its hash. Args: plain_password: Password to verify hashed_password: Stored hash to check against Returns: bool: True if password matches, False otherwise """ return pwd_context.verify(plain_password, hashed_password) def create_lawyer_account( db: Session, user_data: UserCreate, bar_number: str ) -> Union[User, Dict[str, str]]: """ Create a new lawyer account with validation. Args: db: Database session user_data: User creation data bar_number: Lawyer's bar number Returns: User object if created successfully, error dict otherwise """ if not validate_bar_number(bar_number): return {"error": "Invalid bar number format"} existing_bar = db.query(User).filter(User.bar_number == bar_number).first() if existing_bar: return {"error": "Bar number already registered"} if not validate_phone_number(user_data.phone_number): return {"error": "Invalid phone number format"} db_user = User( first_name=user_data.first_name, last_name=user_data.last_name, email=user_data.email, password=hash_password(user_data.password), phone_number=user_data.phone_number, role="lawyer", bar_number=bar_number, is_active=True, address=user_data.address, profile_picture=user_data.profile_picture ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def update_user_last_login(db: Session, user_id: int) -> Optional[User]: """ Update user's last login timestamp. Args: db: Database session user_id: ID of user to update Returns: Updated User object if successful, None otherwise """ user = db.query(User).filter(User.id == user_id).first() if user: user.last_login = datetime.utcnow() db.commit() db.refresh(user) return user return None def get_active_lawyers(db: Session, skip: int = 0, limit: int = 100) -> List[User]: """ Get list of active lawyers. Args: db: Database session skip: Number of records to skip limit: Maximum number of records to return Returns: List of active lawyer User objects """ return db.query(User).filter( User.role == "lawyer", User.is_active == True ).offset(skip).limit(limit).all() def deactivate_user_account(db: Session, user_id: int) -> Union[User, Dict[str, str]]: """ Deactivate a user account. Args: db: Database session user_id: ID of user to deactivate Returns: Updated User object if successful, error dict otherwise """ user = db.query(User).filter(User.id == user_id).first() if not user: return {"error": "User not found"} user.is_active = False db.commit() db.refresh(user) return user