95 lines
2.4 KiB
Python
95 lines
2.4 KiB
Python
from typing import Optional, Dict, Union
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import Session
|
|
from models.auth import Auth
|
|
from schemas.auth import AuthCreate
|
|
import uuid
|
|
|
|
def validate_token(token: str) -> bool:
|
|
"""
|
|
Validate that a token is in the correct format.
|
|
|
|
Args:
|
|
token: Authentication token to validate
|
|
|
|
Returns:
|
|
bool: True if token format is valid, False otherwise
|
|
"""
|
|
try:
|
|
# Check if token is valid UUID format
|
|
uuid.UUID(token)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
def get_active_auth_sessions(db: Session, user_id: str) -> list[Auth]:
|
|
"""
|
|
Get all active authentication sessions for a user.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_id: ID of the user
|
|
|
|
Returns:
|
|
List of active Auth objects for the user
|
|
"""
|
|
return db.query(Auth).filter(Auth.user_id == user_id).all()
|
|
|
|
def invalidate_auth_token(db: Session, token: str) -> Union[Auth, Dict[str, str]]:
|
|
"""
|
|
Invalidate a specific authentication token.
|
|
|
|
Args:
|
|
db: Database session
|
|
token: Token to invalidate
|
|
|
|
Returns:
|
|
Auth object if found and invalidated, error dict otherwise
|
|
"""
|
|
auth_session = db.query(Auth).filter(Auth.token == token).first()
|
|
|
|
if not auth_session:
|
|
return {"error": "Invalid or expired token"}
|
|
|
|
db.delete(auth_session)
|
|
db.commit()
|
|
|
|
return auth_session
|
|
|
|
def logout_all_devices(db: Session, user_id: str) -> bool:
|
|
"""
|
|
Logout user from all devices by invalidating all auth tokens.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_id: ID of the user to logout
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
db.query(Auth).filter(Auth.user_id == user_id).delete()
|
|
db.commit()
|
|
return True
|
|
except Exception:
|
|
db.rollback()
|
|
return False
|
|
|
|
def get_device_sessions(db: Session, user_id: str, device_id: Optional[str]) -> list[Auth]:
|
|
"""
|
|
Get authentication sessions for a specific device.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_id: ID of the user
|
|
device_id: Optional device ID to filter by
|
|
|
|
Returns:
|
|
List of Auth objects matching the criteria
|
|
"""
|
|
query = db.query(Auth).filter(Auth.user_id == user_id)
|
|
|
|
if device_id:
|
|
query = query.filter(Auth.device_id == device_id)
|
|
|
|
return query.all() |