Automated Action cb4f834fc3 Implement complete Task Manager API with FastAPI
- Set up FastAPI application with CORS support
- Created SQLAlchemy models for task management
- Implemented CRUD operations for tasks
- Added database migrations with Alembic
- Created REST API endpoints for task operations
- Added health check endpoint
- Updated README with comprehensive documentation
- Applied code formatting with Ruff
2025-06-19 21:12:16 +00:00

72 lines
2.3 KiB
Python

from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List
import uvicorn
from app.db.session import get_db
from app.schemas.task import TaskCreate, TaskUpdate, TaskResponse
from app.crud.task import task_crud
app = FastAPI(
title="Task Manager API",
description="A simple task management API built with FastAPI",
version="1.0.0",
openapi_url="/openapi.json"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"title": "Task Manager API",
"documentation": "/docs",
"health_check": "/health"
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"message": "Task Manager API is running"
}
@app.post("/tasks/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
async def create_task(task: TaskCreate, db: Session = Depends(get_db)):
return task_crud.create(db=db, obj_in=task)
@app.get("/tasks/", response_model=List[TaskResponse])
async def get_tasks(db: Session = Depends(get_db)):
return task_crud.get_multi(db=db)
@app.get("/tasks/{task_id}", response_model=TaskResponse)
async def get_task(task_id: int, db: Session = Depends(get_db)):
task = task_crud.get(db=db, id=task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.put("/tasks/{task_id}", response_model=TaskResponse)
async def update_task(task_id: int, task_update: TaskUpdate, db: Session = Depends(get_db)):
task = task_crud.get(db=db, id=task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task_crud.update(db=db, db_obj=task, obj_in=task_update)
@app.delete("/tasks/{task_id}")
async def delete_task(task_id: int, db: Session = Depends(get_db)):
task = task_crud.get(db=db, id=task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
task_crud.remove(db=db, id=task_id)
return {"message": "Task deleted successfully"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)