Add helper functions for Auth
This commit is contained in:
parent
16a4d6ff97
commit
dc03d149d7
95
helpers/auth_helpers.py
Normal file
95
helpers/auth_helpers.py
Normal file
@ -0,0 +1,95 @@
|
||||
from typing import Optional, Dict, Union
|
||||
from datetime import datetime
|
||||
from sqlalchemy.orm import Session
|
||||
from models.auth import Auth
|
||||
from schemas.auth import AuthCreate
|
||||
import uuid
|
||||
|
||||
def validate_token(token: str) -> bool:
|
||||
"""
|
||||
Validate that a token is in the correct format.
|
||||
|
||||
Args:
|
||||
token: Authentication token to validate
|
||||
|
||||
Returns:
|
||||
bool: True if token format is valid, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Check if token is valid UUID format
|
||||
uuid.UUID(token)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def get_active_auth_sessions(db: Session, user_id: str) -> list[Auth]:
|
||||
"""
|
||||
Get all active authentication sessions for a user.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
user_id: ID of the user
|
||||
|
||||
Returns:
|
||||
List of active Auth objects for the user
|
||||
"""
|
||||
return db.query(Auth).filter(Auth.user_id == user_id).all()
|
||||
|
||||
def invalidate_auth_token(db: Session, token: str) -> Union[Auth, Dict[str, str]]:
|
||||
"""
|
||||
Invalidate a specific authentication token.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
token: Token to invalidate
|
||||
|
||||
Returns:
|
||||
Auth object if found and invalidated, error dict otherwise
|
||||
"""
|
||||
auth_session = db.query(Auth).filter(Auth.token == token).first()
|
||||
|
||||
if not auth_session:
|
||||
return {"error": "Invalid or expired token"}
|
||||
|
||||
db.delete(auth_session)
|
||||
db.commit()
|
||||
|
||||
return auth_session
|
||||
|
||||
def logout_all_devices(db: Session, user_id: str) -> bool:
|
||||
"""
|
||||
Logout user from all devices by invalidating all auth tokens.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
user_id: ID of the user to logout
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
db.query(Auth).filter(Auth.user_id == user_id).delete()
|
||||
db.commit()
|
||||
return True
|
||||
except Exception:
|
||||
db.rollback()
|
||||
return False
|
||||
|
||||
def get_device_sessions(db: Session, user_id: str, device_id: Optional[str]) -> list[Auth]:
|
||||
"""
|
||||
Get authentication sessions for a specific device.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
user_id: ID of the user
|
||||
device_id: Optional device ID to filter by
|
||||
|
||||
Returns:
|
||||
List of Auth objects matching the criteria
|
||||
"""
|
||||
query = db.query(Auth).filter(Auth.user_id == user_id)
|
||||
|
||||
if device_id:
|
||||
query = query.filter(Auth.device_id == device_id)
|
||||
|
||||
return query.all()
|
Loading…
x
Reference in New Issue
Block a user