Automated Action 69f6a404bd Add user authentication to Todo application
- Create User model and schema
- Implement password hashing with bcrypt
- Add JWT token-based authentication
- Create user and auth endpoints
- Update todo endpoints with user authentication
- Add alembic migration for user model
- Update README with new features
2025-05-16 02:07:51 +00:00

85 lines
2.3 KiB
Python

from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.security import ALGORITHM, SECRET_KEY
from app.crud.user import get_user
from app.db.database import get_db
from app.models.user import User
from app.schemas.user import TokenData
# OAuth2 configuration
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
"""
Validates the access token and returns the current user
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
token_data = TokenData(user_id=int(user_id))
except JWTError:
raise credentials_exception
user = get_user(db, user_id=token_data.user_id)
if user is None:
raise credentials_exception
return user
def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
"""
Checks if the current user is active
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
return current_user
def get_optional_current_user(
db: Session = Depends(get_db), token: Optional[str] = Depends(oauth2_scheme)
) -> Optional[User]:
"""
Similar to get_current_user but doesn't raise an exception for missing token
"""
if token is None:
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
return None
token_data = TokenData(user_id=int(user_id))
except JWTError:
return None
user = get_user(db, user_id=token_data.user_id)
if user is None or not user.is_active:
return None
return user