Automated Action f8bb3dd21d Implement Task Management Tool with FastAPI and SQLite
- Set up FastAPI project structure with API versioning
- Create database models for users and tasks
- Implement SQLAlchemy ORM with SQLite database
- Initialize Alembic for database migrations
- Create API endpoints for task management (CRUD)
- Create API endpoints for user management
- Add JWT authentication and authorization
- Add health check endpoint
- Add comprehensive README.md with API documentation
2025-06-02 20:40:57 +00:00

124 lines
3.7 KiB
Python

from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.api.deps import get_current_active_user
from app.core.config import settings
from app.core.security import create_access_token
from app.crud import user as crud_user
from app.db.session import get_db
from app.models.user import User
from app.schemas.token import Token
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate, UserUpdate
router = APIRouter()
@router.post(
"/register",
response_model=UserSchema,
status_code=status.HTTP_201_CREATED
)
def create_user(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
) -> Any:
"""
Create a new user.
"""
# Check if user with given email exists
user = crud_user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Check if user with given username exists
user = crud_user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
# Create new user
user = crud_user.create(db, obj_in=user_in)
return user
@router.post("/login", response_model=Token)
def login_access_token(
db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests.
"""
user = crud_user.authenticate(
db=db, username=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.get("/me", response_model=UserSchema)
def read_user_me(
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.put("/me", response_model=UserSchema)
def update_user_me(
*,
db: Session = Depends(get_db),
user_in: UserUpdate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Update current user.
"""
# Check if user is trying to update email and if email already exists
if user_in.email and user_in.email != current_user.email:
user = crud_user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Check if user is trying to update username and if username already exists
if user_in.username and user_in.username != current_user.username:
user = crud_user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
# Update user
user = crud_user.update(db, db_obj=current_user, obj_in=user_in)
return user