84 lines
1.9 KiB
Python
84 lines
1.9 KiB
Python
import secrets
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from jose import jwt, JWTError
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import settings
|
|
from app.core.security import ALGORITHM
|
|
from app.models.token import Token
|
|
|
|
|
|
def create_refresh_token(
|
|
db: Session, *, user_id: int, expires_delta: Optional[timedelta] = None
|
|
) -> Token:
|
|
"""
|
|
Create a refresh token in the database
|
|
"""
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(days=7) # 7 days default
|
|
|
|
# Generate a secure random token
|
|
token_value = secrets.token_urlsafe(64)
|
|
|
|
# Create token in DB
|
|
db_token = Token(
|
|
user_id=user_id, token=token_value, expires_at=expire, is_revoked=False
|
|
)
|
|
db.add(db_token)
|
|
db.commit()
|
|
db.refresh(db_token)
|
|
|
|
return db_token
|
|
|
|
|
|
def get_by_token(db: Session, token: str) -> Optional[Token]:
|
|
"""
|
|
Get a token by its value
|
|
"""
|
|
return db.query(Token).filter(Token.token == token).first()
|
|
|
|
|
|
def is_token_valid(token: Token) -> bool:
|
|
"""
|
|
Check if a token is valid (not expired and not revoked)
|
|
"""
|
|
now = datetime.utcnow()
|
|
return token.expires_at > now and not token.is_revoked
|
|
|
|
|
|
def revoke_token(db: Session, token: Token) -> Token:
|
|
"""
|
|
Revoke a token
|
|
"""
|
|
token.is_revoked = True
|
|
db.add(token)
|
|
db.commit()
|
|
db.refresh(token)
|
|
return token
|
|
|
|
|
|
def revoke_all_user_tokens(db: Session, user_id: int) -> None:
|
|
"""
|
|
Revoke all tokens for a user
|
|
"""
|
|
tokens = db.query(Token).filter(Token.user_id == user_id).all()
|
|
for token in tokens:
|
|
token.is_revoked = True
|
|
|
|
db.commit()
|
|
|
|
|
|
def decode_token(token: str) -> Optional[dict]:
|
|
"""
|
|
Decode a JWT token
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
|
|
return payload
|
|
except JWTError:
|
|
return None
|