diff --git a/helpers/auth_helpers.py b/helpers/auth_helpers.py new file mode 100644 index 0000000..54b418c --- /dev/null +++ b/helpers/auth_helpers.py @@ -0,0 +1,95 @@ +from typing import Optional, Dict, Union +from datetime import datetime +from sqlalchemy.orm import Session +from models.auth import Auth +from schemas.auth import AuthCreate +import uuid + +def validate_token(token: str) -> bool: + """ + Validate that a token is in the correct format. + + Args: + token: Authentication token to validate + + Returns: + bool: True if token format is valid, False otherwise + """ + try: + # Check if token is valid UUID format + uuid.UUID(token) + return True + except ValueError: + return False + +def get_active_auth_sessions(db: Session, user_id: str) -> list[Auth]: + """ + Get all active authentication sessions for a user. + + Args: + db: Database session + user_id: ID of the user + + Returns: + List of active Auth objects for the user + """ + return db.query(Auth).filter(Auth.user_id == user_id).all() + +def invalidate_auth_token(db: Session, token: str) -> Union[Auth, Dict[str, str]]: + """ + Invalidate a specific authentication token. + + Args: + db: Database session + token: Token to invalidate + + Returns: + Auth object if found and invalidated, error dict otherwise + """ + auth_session = db.query(Auth).filter(Auth.token == token).first() + + if not auth_session: + return {"error": "Invalid or expired token"} + + db.delete(auth_session) + db.commit() + + return auth_session + +def logout_all_devices(db: Session, user_id: str) -> bool: + """ + Logout user from all devices by invalidating all auth tokens. + + Args: + db: Database session + user_id: ID of the user to logout + + Returns: + bool: True if successful, False otherwise + """ + try: + db.query(Auth).filter(Auth.user_id == user_id).delete() + db.commit() + return True + except Exception: + db.rollback() + return False + +def get_device_sessions(db: Session, user_id: str, device_id: Optional[str]) -> list[Auth]: + """ + Get authentication sessions for a specific device. + + Args: + db: Database session + user_id: ID of the user + device_id: Optional device ID to filter by + + Returns: + List of Auth objects matching the criteria + """ + query = db.query(Auth).filter(Auth.user_id == user_id) + + if device_id: + query = query.filter(Auth.device_id == device_id) + + return query.all() \ No newline at end of file