from typing import Optional, Dict, Union from datetime import datetime from sqlalchemy.orm import Session from models.auth import Auth from schemas.auth import AuthCreate import uuid def validate_token(token: str) -> bool: """ Validate that a token is in the correct format. Args: token: Authentication token to validate Returns: bool: True if token format is valid, False otherwise """ try: # Check if token is valid UUID format uuid.UUID(token) return True except ValueError: return False def get_active_auth_sessions(db: Session, user_id: str) -> list[Auth]: """ Get all active authentication sessions for a user. Args: db: Database session user_id: ID of the user Returns: List of active Auth objects for the user """ return db.query(Auth).filter(Auth.user_id == user_id).all() def invalidate_auth_token(db: Session, token: str) -> Union[Auth, Dict[str, str]]: """ Invalidate a specific authentication token. Args: db: Database session token: Token to invalidate Returns: Auth object if found and invalidated, error dict otherwise """ auth_session = db.query(Auth).filter(Auth.token == token).first() if not auth_session: return {"error": "Invalid or expired token"} db.delete(auth_session) db.commit() return auth_session def logout_all_devices(db: Session, user_id: str) -> bool: """ Logout user from all devices by invalidating all auth tokens. Args: db: Database session user_id: ID of the user to logout Returns: bool: True if successful, False otherwise """ try: db.query(Auth).filter(Auth.user_id == user_id).delete() db.commit() return True except Exception: db.rollback() return False def get_device_sessions(db: Session, user_id: str, device_id: Optional[str]) -> list[Auth]: """ Get authentication sessions for a specific device. Args: db: Database session user_id: ID of the user device_id: Optional device ID to filter by Returns: List of Auth objects matching the criteria """ query = db.query(Auth).filter(Auth.user_id == user_id) if device_id: query = query.filter(Auth.device_id == device_id) return query.all()