Automated Action 350b445f45 Implement task management tool with FastAPI and SQLite
- Setup project structure and FastAPI application
- Create SQLite database with SQLAlchemy
- Implement user authentication with JWT
- Create task and user models
- Add CRUD operations for tasks and users
- Configure Alembic for database migrations
- Implement API endpoints for task management
- Add error handling and validation
- Configure CORS middleware
- Create health check endpoint
- Add comprehensive documentation
2025-06-12 12:02:37 +00:00

70 lines
2.0 KiB
Python

from datetime import datetime
from typing import Optional
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.exceptions import (
ForbiddenException,
InactiveUserException,
UnauthorizedException,
)
from app.core.security import verify_password
from app.crud.crud_user import user
from app.db.session import get_db
from app.models.user import User
from app.schemas.token import TokenPayload
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
async def get_current_user(
db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
credentials_exception = UnauthorizedException("Could not validate credentials")
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
if datetime.fromtimestamp(token_data.exp) < datetime.now():
raise credentials_exception
except JWTError as e:
raise credentials_exception from e
user_obj = await user.get(db, id_=token_data.sub)
if not user_obj:
raise credentials_exception
return user_obj
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_active:
raise InactiveUserException()
return current_user
async def get_current_active_superuser(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_superuser:
raise ForbiddenException("The user doesn't have enough privileges")
return current_user
async def authenticate(
db: AsyncSession, *, email: str, password: str
) -> Optional[User]:
user_obj = await user.get_by_email(db, email=email)
if not user_obj:
return None
if not verify_password(password, user_obj.hashed_password):
return None
return user_obj