Automated Action f19a6fea04 Build complete Personal Task Management API with FastAPI
- Implemented user authentication with JWT tokens
- Created comprehensive task management with CRUD operations
- Added category system for task organization
- Set up SQLite database with SQLAlchemy ORM
- Configured Alembic for database migrations
- Added API documentation with OpenAPI/Swagger
- Implemented proper authorization and user scoping
- Created health check and root endpoints
- Updated README with complete documentation
2025-06-21 16:16:40 +00:00

102 lines
3.1 KiB
Python

from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from datetime import datetime
from app.db.session import get_db
from app.core.deps import get_current_active_user
from app.models.user import User
from app.models.task import Task as TaskModel, TaskStatus
from app.schemas.task import Task, TaskCreate, TaskUpdate
router = APIRouter()
@router.post("/", response_model=Task)
def create_task(
task: TaskCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
db_task = TaskModel(**task.dict(), owner_id=current_user.id)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.get("/", response_model=List[Task])
def read_tasks(
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = Query(None),
category_id: Optional[int] = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
query = db.query(TaskModel).filter(TaskModel.owner_id == current_user.id)
if status:
query = query.filter(TaskModel.status == status)
if category_id:
query = query.filter(TaskModel.category_id == category_id)
tasks = query.offset(skip).limit(limit).all()
return tasks
@router.get("/{task_id}", response_model=Task)
def read_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
task = db.query(TaskModel).filter(
TaskModel.id == task_id,
TaskModel.owner_id == current_user.id
).first()
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.put("/{task_id}", response_model=Task)
def update_task(
task_id: int,
task_update: TaskUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
task = db.query(TaskModel).filter(
TaskModel.id == task_id,
TaskModel.owner_id == current_user.id
).first()
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
update_data = task_update.dict(exclude_unset=True)
# Set completed_at when task is marked as completed
if update_data.get("status") == TaskStatus.COMPLETED and task.status != TaskStatus.COMPLETED:
update_data["completed_at"] = datetime.utcnow()
elif update_data.get("status") != TaskStatus.COMPLETED:
update_data["completed_at"] = None
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
return task
@router.delete("/{task_id}")
def delete_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
task = db.query(TaskModel).filter(
TaskModel.id == task_id,
TaskModel.owner_id == current_user.id
).first()
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(task)
db.commit()
return {"message": "Task deleted successfully"}