
- Create complete task management system with CRUD operations - Add Task model with status, priority, timestamps - Set up SQLite database with SQLAlchemy and Alembic migrations - Implement RESTful API endpoints for task operations - Configure CORS middleware and API documentation - Add health check endpoint and root information response - Include proper project structure and comprehensive README
57 lines
1.8 KiB
Python
57 lines
1.8 KiB
Python
from typing import List
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.orm import Session
|
|
from app.db.session import get_db
|
|
from app.models.task import Task as TaskModel
|
|
from app.schemas.task import Task, TaskCreate, TaskUpdate
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/", response_model=Task)
|
|
def create_task(task: TaskCreate, db: Session = Depends(get_db)):
|
|
db_task = TaskModel(**task.dict())
|
|
db.add(db_task)
|
|
db.commit()
|
|
db.refresh(db_task)
|
|
return db_task
|
|
|
|
|
|
@router.get("/", response_model=List[Task])
|
|
def read_tasks(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
tasks = db.query(TaskModel).offset(skip).limit(limit).all()
|
|
return tasks
|
|
|
|
|
|
@router.get("/{task_id}", response_model=Task)
|
|
def read_task(task_id: int, db: Session = Depends(get_db)):
|
|
task = db.query(TaskModel).filter(TaskModel.id == task_id).first()
|
|
if task is None:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return task
|
|
|
|
|
|
@router.put("/{task_id}", response_model=Task)
|
|
def update_task(task_id: int, task_update: TaskUpdate, db: Session = Depends(get_db)):
|
|
task = db.query(TaskModel).filter(TaskModel.id == task_id).first()
|
|
if task is None:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
update_data = task_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(task, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(task)
|
|
return task
|
|
|
|
|
|
@router.delete("/{task_id}")
|
|
def delete_task(task_id: int, db: Session = Depends(get_db)):
|
|
task = db.query(TaskModel).filter(TaskModel.id == task_id).first()
|
|
if task is None:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
db.delete(task)
|
|
db.commit()
|
|
return {"message": "Task deleted successfully"} |