import secrets from datetime import datetime, timedelta from typing import Optional from jose import jwt, JWTError from sqlalchemy.orm import Session from app.core.config import settings from app.core.security import ALGORITHM from app.models.token import Token def create_refresh_token( db: Session, *, user_id: int, expires_delta: Optional[timedelta] = None ) -> Token: """ Create a refresh token in the database """ if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=7) # 7 days default # Generate a secure random token token_value = secrets.token_urlsafe(64) # Create token in DB db_token = Token( user_id=user_id, token=token_value, expires_at=expire, is_revoked=False ) db.add(db_token) db.commit() db.refresh(db_token) return db_token def get_by_token(db: Session, token: str) -> Optional[Token]: """ Get a token by its value """ return db.query(Token).filter(Token.token == token).first() def is_token_valid(token: Token) -> bool: """ Check if a token is valid (not expired and not revoked) """ now = datetime.utcnow() return token.expires_at > now and not token.is_revoked def revoke_token(db: Session, token: Token) -> Token: """ Revoke a token """ token.is_revoked = True db.add(token) db.commit() db.refresh(token) return token def revoke_all_user_tokens(db: Session, user_id: int) -> None: """ Revoke all tokens for a user """ tokens = db.query(Token).filter(Token.user_id == user_id).all() for token in tokens: token.is_revoked = True db.commit() def decode_token(token: str) -> Optional[dict]: """ Decode a JWT token """ try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: return None