Automated Action 6f269066a5 Implement task management tool with FastAPI and SQLite
This commit includes:
- Project structure and configuration
- Database models for tasks, users, and categories
- Authentication system with JWT
- CRUD endpoints for tasks and categories
- Search, filter, and sorting functionality
- Health check endpoint
- Alembic migration setup
- Documentation
2025-06-03 07:48:27 +00:00

109 lines
2.5 KiB
Python

from typing import List, Optional
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.models.task import Task
from app.schemas.task import TaskCreate, TaskUpdate
def get_task(db: Session, task_id: int) -> Optional[Task]:
"""
Get a task by ID
"""
return db.query(Task).filter(Task.id == task_id).first()
def get_task_by_user(db: Session, task_id: int, user_id: int) -> Optional[Task]:
"""
Get a task by ID for a specific user
"""
return db.query(Task).filter(
Task.id == task_id, Task.owner_id == user_id
).first()
def get_tasks(
db: Session,
user_id: int,
skip: int = 0,
limit: int = 100,
completed: Optional[bool] = None,
category_id: Optional[int] = None,
search: Optional[str] = None,
priority: Optional[str] = None,
) -> List[Task]:
"""
Get all tasks for a user with optional filtering
"""
query = db.query(Task).filter(Task.owner_id == user_id)
# Apply filters
if completed is not None:
query = query.filter(Task.is_completed == completed)
if category_id:
query = query.filter(Task.category_id == category_id)
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
Task.title.ilike(search_term),
Task.description.ilike(search_term)
)
)
if priority:
query = query.filter(Task.priority == priority)
# Apply pagination and return results
return query.order_by(Task.created_at.desc()).offset(skip).limit(limit).all()
def create_task(db: Session, task_in: TaskCreate, user_id: int) -> Task:
"""
Create a new task for a user
"""
db_task = Task(
**task_in.dict(),
owner_id=user_id,
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
def update_task(
db: Session, task_id: int, task_in: TaskUpdate, user_id: int
) -> Optional[Task]:
"""
Update a task
"""
db_task = get_task_by_user(db, task_id, user_id)
if not db_task:
return None
update_data = task_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_task, field, value)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
def delete_task(db: Session, task_id: int, user_id: int) -> Optional[Task]:
"""
Delete a task
"""
db_task = get_task_by_user(db, task_id, user_id)
if not db_task:
return None
db.delete(db_task)
db.commit()
return db_task