
- Add role-based access control (admin/user roles) - Implement refresh token functionality - Add token revocation (logout) capability - Create admin-only endpoints - Add role validation middleware - Update documentation
67 lines
1.7 KiB
Python
67 lines
1.7 KiB
Python
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.token import RefreshToken
|
|
from app.schemas.token import RefreshTokenCreate, RefreshTokenUpdate
|
|
|
|
|
|
def get_refresh_token(db: Session, token: str) -> Optional[RefreshToken]:
|
|
return db.query(RefreshToken).filter(RefreshToken.token == token).first()
|
|
|
|
|
|
def get_refresh_tokens_by_user(
|
|
db: Session, user_id: int, skip: int = 0, limit: int = 100
|
|
) -> List[RefreshToken]:
|
|
return (
|
|
db.query(RefreshToken)
|
|
.filter(RefreshToken.user_id == user_id)
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
|
|
def create_refresh_token(db: Session, token_in: RefreshTokenCreate) -> RefreshToken:
|
|
db_token = RefreshToken(
|
|
token=token_in.token,
|
|
expires_at=token_in.expires_at,
|
|
user_id=token_in.user_id,
|
|
created_at=datetime.utcnow(),
|
|
revoked=False,
|
|
)
|
|
db.add(db_token)
|
|
db.commit()
|
|
db.refresh(db_token)
|
|
return db_token
|
|
|
|
|
|
def update_refresh_token(
|
|
db: Session, db_obj: RefreshToken, obj_in: RefreshTokenUpdate
|
|
) -> RefreshToken:
|
|
update_data = obj_in.model_dump(exclude_unset=True)
|
|
for field in update_data:
|
|
setattr(db_obj, field, update_data[field])
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
|
|
def revoke_refresh_token(db: Session, token: str) -> Optional[RefreshToken]:
|
|
db_token = get_refresh_token(db, token=token)
|
|
if db_token:
|
|
db_token.revoked = True
|
|
db.add(db_token)
|
|
db.commit()
|
|
db.refresh(db_token)
|
|
return db_token
|
|
|
|
|
|
def is_token_valid(token: RefreshToken) -> bool:
|
|
return (
|
|
token is not None
|
|
and not token.revoked
|
|
and token.expires_at > datetime.utcnow()
|
|
) |