84 lines
1.9 KiB
Python

import secrets
from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.security import ALGORITHM
from app.models.token import Token
def create_refresh_token(
db: Session, *, user_id: int, expires_delta: Optional[timedelta] = None
) -> Token:
"""
Create a refresh token in the database
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=7) # 7 days default
# Generate a secure random token
token_value = secrets.token_urlsafe(64)
# Create token in DB
db_token = Token(
user_id=user_id, token=token_value, expires_at=expire, is_revoked=False
)
db.add(db_token)
db.commit()
db.refresh(db_token)
return db_token
def get_by_token(db: Session, token: str) -> Optional[Token]:
"""
Get a token by its value
"""
return db.query(Token).filter(Token.token == token).first()
def is_token_valid(token: Token) -> bool:
"""
Check if a token is valid (not expired and not revoked)
"""
now = datetime.utcnow()
return token.expires_at > now and not token.is_revoked
def revoke_token(db: Session, token: Token) -> Token:
"""
Revoke a token
"""
token.is_revoked = True
db.add(token)
db.commit()
db.refresh(token)
return token
def revoke_all_user_tokens(db: Session, user_id: int) -> None:
"""
Revoke all tokens for a user
"""
tokens = db.query(Token).filter(Token.user_id == user_id).all()
for token in tokens:
token.is_revoked = True
db.commit()
def decode_token(token: str) -> Optional[dict]:
"""
Decode a JWT token
"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None