Automated Action f63bd5104b Add Task Manager API with FastAPI and SQLite
- Created FastAPI application structure with main.py
- Implemented SQLAlchemy models for tasks with priority and completion status
- Set up database connection with SQLite using absolute paths
- Created Alembic migrations for database schema
- Added CRUD operations for task management
- Implemented health check and root endpoints
- Added CORS configuration for cross-origin requests
- Created comprehensive API documentation
- Added Ruff configuration for code linting
- Updated README with installation and usage instructions
2025-06-24 07:52:20 +00:00

72 lines
2.4 KiB
Python

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List
import os
from pathlib import Path
from app.db.session import get_db
from app.models.task import Task as TaskModel
from app.schemas.task import Task, TaskCreate, TaskUpdate
from app.crud.task import create_task, get_tasks, get_task, update_task, delete_task
app = FastAPI(
title="Task Manager API",
description="A simple task management API built with FastAPI",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"title": "Task Manager API",
"description": "A simple task management API built with FastAPI",
"version": "1.0.0",
"documentation": "/docs",
"health_check": "/health"
}
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "Task Manager API"}
@app.post("/tasks/", response_model=Task)
async def create_new_task(task: TaskCreate, db: Session = Depends(get_db)):
return create_task(db=db, task=task)
@app.get("/tasks/", response_model=List[Task])
async def read_tasks(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return get_tasks(db=db, skip=skip, limit=limit)
@app.get("/tasks/{task_id}", response_model=Task)
async def read_task(task_id: int, db: Session = Depends(get_db)):
db_task = get_task(db=db, task_id=task_id)
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
return db_task
@app.put("/tasks/{task_id}", response_model=Task)
async def update_existing_task(task_id: int, task: TaskUpdate, db: Session = Depends(get_db)):
db_task = update_task(db=db, task_id=task_id, task=task)
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
return db_task
@app.delete("/tasks/{task_id}")
async def delete_existing_task(task_id: int, db: Session = Depends(get_db)):
success = delete_task(db=db, task_id=task_id)
if not success:
raise HTTPException(status_code=404, detail="Task not found")
return {"message": "Task deleted successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)