diff --git a/helpers/user_helpers.py b/helpers/user_helpers.py new file mode 100644 index 0000000..cb84e8f --- /dev/null +++ b/helpers/user_helpers.py @@ -0,0 +1,161 @@ +from typing import Optional, Dict, Union, List +from datetime import datetime +import re +from sqlalchemy.orm import Session +from passlib.context import CryptContext +from models.user import User +from schemas.user import UserCreate, UserUpdate + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +def validate_phone_number(phone: str) -> bool: + """ + Validate phone number format. + + Args: + phone: Phone number to validate + + Returns: + bool: True if valid format, False otherwise + """ + pattern = r'^\+?1?\d{9,15}$' + return bool(re.match(pattern, phone)) + +def validate_bar_number(bar_number: str) -> bool: + """ + Validate lawyer bar number format. + + Args: + bar_number: Bar number to validate + + Returns: + bool: True if valid format, False otherwise + """ + pattern = r'^[A-Z0-9]{5,12}$' + return bool(re.match(pattern, bar_number)) + +def hash_password(password: str) -> str: + """ + Hash a password using bcrypt. + + Args: + password: Plain text password + + Returns: + str: Hashed password + """ + return pwd_context.hash(password) + +def verify_password(plain_password: str, hashed_password: str) -> bool: + """ + Verify a password against its hash. + + Args: + plain_password: Password to verify + hashed_password: Stored hash to check against + + Returns: + bool: True if password matches, False otherwise + """ + return pwd_context.verify(plain_password, hashed_password) + +def create_lawyer_account( + db: Session, + user_data: UserCreate, + bar_number: str +) -> Union[User, Dict[str, str]]: + """ + Create a new lawyer account with validation. + + Args: + db: Database session + user_data: User creation data + bar_number: Lawyer's bar number + + Returns: + User object if created successfully, error dict otherwise + """ + if not validate_bar_number(bar_number): + return {"error": "Invalid bar number format"} + + existing_bar = db.query(User).filter(User.bar_number == bar_number).first() + if existing_bar: + return {"error": "Bar number already registered"} + + if not validate_phone_number(user_data.phone_number): + return {"error": "Invalid phone number format"} + + db_user = User( + first_name=user_data.first_name, + last_name=user_data.last_name, + email=user_data.email, + password=hash_password(user_data.password), + phone_number=user_data.phone_number, + role="lawyer", + bar_number=bar_number, + is_active=True, + address=user_data.address, + profile_picture=user_data.profile_picture + ) + + db.add(db_user) + db.commit() + db.refresh(db_user) + + return db_user + +def update_user_last_login(db: Session, user_id: int) -> Optional[User]: + """ + Update user's last login timestamp. + + Args: + db: Database session + user_id: ID of user to update + + Returns: + Updated User object if successful, None otherwise + """ + user = db.query(User).filter(User.id == user_id).first() + if user: + user.last_login = datetime.utcnow() + db.commit() + db.refresh(user) + return user + return None + +def get_active_lawyers(db: Session, skip: int = 0, limit: int = 100) -> List[User]: + """ + Get list of active lawyers. + + Args: + db: Database session + skip: Number of records to skip + limit: Maximum number of records to return + + Returns: + List of active lawyer User objects + """ + return db.query(User).filter( + User.role == "lawyer", + User.is_active == True + ).offset(skip).limit(limit).all() + +def deactivate_user_account(db: Session, user_id: int) -> Union[User, Dict[str, str]]: + """ + Deactivate a user account. + + Args: + db: Database session + user_id: ID of user to deactivate + + Returns: + Updated User object if successful, error dict otherwise + """ + user = db.query(User).filter(User.id == user_id).first() + if not user: + return {"error": "User not found"} + + user.is_active = False + db.commit() + db.refresh(user) + return user \ No newline at end of file