Implement Task Manager API with FastAPI and SQLite

This commit is contained in:
Automated Action 2025-05-30 17:39:08 +00:00
parent d13a1112ea
commit ba16743ab1
33 changed files with 1796 additions and 2 deletions

147
README.md
View File

@ -1,3 +1,146 @@
# FastAPI Application # Task Manager API
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform. A robust RESTful API for managing tasks, built with FastAPI and SQLite.
## Features
- **User Authentication**: Secure registration and login with JWT tokens
- **Task Management**: Complete CRUD operations for tasks
- **Task Filtering**: Filter tasks by status, priority, and search terms
- **Health Monitoring**: Endpoint for application health checking
- **API Documentation**: Interactive documentation with Swagger UI and ReDoc
- **Database Migrations**: Easy database schema management with Alembic
## Tech Stack
- **FastAPI**: Modern, fast web framework for building APIs
- **SQLAlchemy**: SQL toolkit and ORM
- **Alembic**: Database migration tool
- **Pydantic**: Data validation and settings management
- **SQLite**: Lightweight, file-based database
- **JWT**: JSON Web Tokens for secure authentication
- **Ruff**: Fast Python linter for code quality
## Project Structure
```
task-manager-api/
├── alembic.ini # Alembic configuration
├── main.py # Application entry point
├── pyproject.toml # Project configuration
├── requirements.txt # Project dependencies
├── app/ # Application package
│ ├── api/ # API endpoints
│ │ ├── deps.py # API dependencies
│ │ └── v1/ # API version 1
│ │ ├── api.py # API router
│ │ └── endpoints/ # API endpoint modules
│ │ ├── auth.py # Authentication endpoints
│ │ ├── tasks.py # Task management endpoints
│ │ └── users.py # User management endpoints
│ ├── core/ # Core modules
│ │ ├── config.py # Application configuration
│ │ └── security.py # Security utilities
│ ├── crud/ # CRUD operations
│ │ ├── base.py # Base CRUD class
│ │ ├── task.py # Task CRUD operations
│ │ └── user.py # User CRUD operations
│ ├── db/ # Database configuration
│ │ └── database.py # Database setup
│ ├── models/ # SQLAlchemy models
│ │ ├── task.py # Task model
│ │ └── user.py # User model
│ └── schemas/ # Pydantic schemas
│ ├── task.py # Task schemas
│ └── user.py # User schemas
└── migrations/ # Alembic migrations
├── env.py # Alembic environment
├── script.py.mako # Alembic script template
└── versions/ # Migration versions
```
## Installation
1. Clone the repository:
```bash
git clone <repository-url>
cd task-manager-api
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run database migrations:
```bash
alembic upgrade head
```
4. Start the application:
```bash
uvicorn main:app --reload
```
## API Documentation
Once the application is running, you can access:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
- OpenAPI Schema: http://localhost:8000/openapi.json
## Environment Variables
The application uses the following environment variables:
- `SECRET_KEY`: Secret key for JWT tokens (default: "changethisinproduction")
- `ACCESS_TOKEN_EXPIRE_MINUTES`: Token expiration time in minutes (default: 30)
- `API_V1_STR`: API version prefix (default: "/api/v1")
## Usage Examples
### Authentication
```bash
# Register a new user
curl -X POST "http://localhost:8000/api/v1/auth/register" \
-H "Content-Type: application/json" \
-d '{"email": "user@example.com", "username": "testuser", "password": "Password123"}'
# Login and get access token
curl -X POST "http://localhost:8000/api/v1/auth/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=user@example.com&password=Password123"
```
### Task Management
```bash
# Create a new task (requires authentication)
curl -X POST "http://localhost:8000/api/v1/tasks" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"title": "Complete project", "description": "Finish the API project", "priority": "high", "due_date": "2023-12-31T23:59:59"}'
# Get all tasks
curl -X GET "http://localhost:8000/api/v1/tasks" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
# Get a specific task
curl -X GET "http://localhost:8000/api/v1/tasks/1" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
# Update a task
curl -X PUT "http://localhost:8000/api/v1/tasks/1" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"status": "in_progress"}'
# Delete a task
curl -X DELETE "http://localhost:8000/api/v1/tasks/1" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
```
## License
This project is licensed under the MIT License - see the LICENSE file for details.

85
alembic.ini Normal file
View File

@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL with absolute path
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

69
app/api/deps.py Normal file
View File

@ -0,0 +1,69 @@
"""
API dependencies
"""
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core.config import settings
from app.db.database import get_db
# OAuth2 scheme for token authentication
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
"""
Dependency to get the current authenticated user
"""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
"""
Dependency to get the current active user
"""
if not crud.user.is_active(current_user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
"""
Dependency to get the current active superuser
"""
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges"
)
return current_user

0
app/api/v1/__init__.py Normal file
View File

13
app/api/v1/api.py Normal file
View File

@ -0,0 +1,13 @@
"""
API v1 router
"""
from fastapi import APIRouter
from app.api.v1.endpoints import auth, tasks, users
api_router = APIRouter()
# Include routers for different endpoints
api_router.include_router(auth.router, prefix="/auth", tags=["Authentication"])
api_router.include_router(tasks.router, prefix="/tasks", tags=["Tasks"])
api_router.include_router(users.router, prefix="/users", tags=["Users"])

View File

@ -0,0 +1,77 @@
"""
Authentication endpoints
"""
from datetime import timedelta
from typing import Any
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
router = APIRouter()
@router.post("/login", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not crud.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=schemas.User)
def register_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Register a new user
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists",
)
user = crud.user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists",
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.User)
def read_users_me(
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user
"""
return current_user

View File

@ -0,0 +1,141 @@
"""
Task management endpoints
"""
from datetime import datetime
from typing import Any, List, Optional
from app import crud, models, schemas
from app.api import deps
from app.models.task import TaskPriority, TaskStatus
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
router = APIRouter()
@router.post("", response_model=schemas.Task)
def create_task(
*,
db: Session = Depends(deps.get_db),
task_in: schemas.TaskCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create a new task
"""
task = crud.task.create_with_owner(db=db, obj_in=task_in, owner_id=current_user.id)
return task
@router.get("", response_model=List[schemas.Task])
def read_tasks(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
status: Optional[TaskStatus] = None,
priority: Optional[TaskPriority] = None,
search: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve tasks with filtering options
"""
if status:
tasks = crud.task.get_multi_by_owner_and_status(
db=db, owner_id=current_user.id, status=status, skip=skip, limit=limit
)
elif priority:
tasks = crud.task.get_multi_by_owner_and_priority(
db=db, owner_id=current_user.id, priority=priority, skip=skip, limit=limit
)
elif search:
tasks = crud.task.get_multi_by_owner_and_search(
db=db, owner_id=current_user.id, search=search, skip=skip, limit=limit
)
elif start_date and end_date:
tasks = crud.task.get_multi_by_owner_and_due_date_range(
db=db, owner_id=current_user.id, start_date=start_date, end_date=end_date,
skip=skip, limit=limit
)
else:
tasks = crud.task.get_multi_by_owner(
db=db, owner_id=current_user.id, skip=skip, limit=limit
)
return tasks
@router.get("/{id}", response_model=schemas.Task)
def read_task(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get task by ID
"""
task = crud.task.get_by_id_and_owner(db=db, id=id, owner_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
@router.put("/{id}", response_model=schemas.Task)
def update_task(
*,
db: Session = Depends(deps.get_db),
id: int,
task_in: schemas.TaskUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a task
"""
task = crud.task.get_by_id_and_owner(db=db, id=id, owner_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
task = crud.task.update(db=db, db_obj=task, obj_in=task_in)
return task
@router.patch("/{id}/status", response_model=schemas.Task)
def update_task_status(
*,
db: Session = Depends(deps.get_db),
id: int,
status: TaskStatus,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a task status
"""
task = crud.task.get_by_id_and_owner(db=db, id=id, owner_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
task = crud.task.update_status(db=db, db_obj=task, status=status)
return task
@router.delete("/{id}", response_model=schemas.Task)
def delete_task(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a task (soft delete)
"""
task = crud.task.get_by_id_and_owner(db=db, id=id, owner_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
task = crud.task.soft_delete(db=db, db_obj=task)
return task

View File

@ -0,0 +1,77 @@
"""
User management endpoints
"""
from typing import Any, List
from app import crud, models, schemas
from app.api import deps
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
router = APIRouter()
@router.get("", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users (superuser only)
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.get("/{user_id}", response_model=schemas.User)
def read_user(
user_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges"
)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user
"""
if user_in.username is not None:
user_with_username = crud.user.get_by_username(db, username=user_in.username)
if user_with_username and user_with_username.id != current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
if user_in.email is not None:
user_with_email = crud.user.get_by_email(db, email=user_in.email)
if user_with_email and user_with_email.id != current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user

0
app/core/__init__.py Normal file
View File

37
app/core/config.py Normal file
View File

@ -0,0 +1,37 @@
"""
Application configuration from environment variables
"""
import os
from pathlib import Path
from typing import List
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""
Application settings from environment variables
"""
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Task Manager API"
# CORS settings
BACKEND_CORS_ORIGINS: List[str] = ["*"]
# Security settings
SECRET_KEY: str = os.getenv("SECRET_KEY", "changethisinproduction")
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Database settings
DB_DIR: Path = Path("/app") / "storage" / "db"
DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
class Config:
"""Pydantic config class"""
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = True
# Create global settings object
settings = Settings()

43
app/core/security.py Normal file
View File

@ -0,0 +1,43 @@
"""
Security utilities for authentication and authorization
"""
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
# Password context for hashing and verification
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: Optional[timedelta] = None
) -> str:
"""
Create a JWT access token
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a password against a hash
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Hash a password
"""
return pwd_context.hash(password)

5
app/crud/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""
Import CRUD operations for easier access
"""
from app.crud.task import task
from app.crud.user import user

85
app/crud/base.py Normal file
View File

@ -0,0 +1,85 @@
"""
Base CRUD class with common operations
"""
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.database import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
"""
Base CRUD class with standard methods
"""
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
Args:
model: A SQLAlchemy model class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
"""
Get object by ID
"""
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
"""
Get multiple objects with pagination
"""
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
"""
Create a new object
"""
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
"""
Update an object
"""
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
"""
Remove an object
"""
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

179
app/crud/task.py Normal file
View File

@ -0,0 +1,179 @@
"""
CRUD operations for tasks
"""
from datetime import datetime
from typing import List, Optional
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.task import Task, TaskPriority, TaskStatus
from app.schemas.task import TaskCreate, TaskUpdate
class CRUDTask(CRUDBase[Task, TaskCreate, TaskUpdate]):
"""
CRUD operations for Task model
"""
def create_with_owner(
self, db: Session, *, obj_in: TaskCreate, owner_id: int
) -> Task:
"""
Create a new task with owner
"""
db_obj = Task(
title=obj_in.title,
description=obj_in.description,
status=obj_in.status,
priority=obj_in.priority,
due_date=obj_in.due_date,
owner_id=owner_id,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
is_deleted=False,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_multi_by_owner(
self, db: Session, *, owner_id: int, skip: int = 0, limit: int = 100
) -> List[Task]:
"""
Get multiple tasks by owner with pagination
"""
return (
db.query(self.model)
.filter(Task.owner_id == owner_id, not Task.is_deleted)
.offset(skip)
.limit(limit)
.all()
)
def get_multi_by_owner_and_status(
self, db: Session, *, owner_id: int, status: TaskStatus,
skip: int = 0, limit: int = 100
) -> List[Task]:
"""
Get multiple tasks by owner and status with pagination
"""
return (
db.query(self.model)
.filter(
Task.owner_id == owner_id,
Task.status == status,
not Task.is_deleted
)
.offset(skip)
.limit(limit)
.all()
)
def get_multi_by_owner_and_priority(
self, db: Session, *, owner_id: int, priority: TaskPriority,
skip: int = 0, limit: int = 100
) -> List[Task]:
"""
Get multiple tasks by owner and priority with pagination
"""
return (
db.query(self.model)
.filter(
Task.owner_id == owner_id,
Task.priority == priority,
not Task.is_deleted
)
.offset(skip)
.limit(limit)
.all()
)
def get_multi_by_owner_and_search(
self, db: Session, *, owner_id: int, search: str,
skip: int = 0, limit: int = 100
) -> List[Task]:
"""
Get multiple tasks by owner and search term with pagination
"""
search_pattern = f"%{search}%"
return (
db.query(self.model)
.filter(
Task.owner_id == owner_id,
or_(
Task.title.ilike(search_pattern),
Task.description.ilike(search_pattern)
),
not Task.is_deleted
)
.offset(skip)
.limit(limit)
.all()
)
def get_multi_by_owner_and_due_date_range(
self, db: Session, *, owner_id: int, start_date: datetime, end_date: datetime,
skip: int = 0, limit: int = 100
) -> List[Task]:
"""
Get multiple tasks by owner and due date range with pagination
"""
return (
db.query(self.model)
.filter(
Task.owner_id == owner_id,
Task.due_date >= start_date,
Task.due_date <= end_date,
not Task.is_deleted
)
.offset(skip)
.limit(limit)
.all()
)
def get_by_id_and_owner(
self, db: Session, *, id: int, owner_id: int
) -> Optional[Task]:
"""
Get a task by ID and owner
"""
return (
db.query(self.model)
.filter(
Task.id == id,
Task.owner_id == owner_id,
not Task.is_deleted
)
.first()
)
def update_status(
self, db: Session, *, db_obj: Task, status: TaskStatus
) -> Task:
"""
Update a task status
"""
db_obj.status = status
db_obj.updated_at = datetime.utcnow()
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def soft_delete(
self, db: Session, *, db_obj: Task
) -> Task:
"""
Mark a task as deleted (soft delete)
"""
db_obj.is_deleted = True
db_obj.updated_at = datetime.utcnow()
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
task = CRUDTask(Task)

86
app/crud/user.py Normal file
View File

@ -0,0 +1,86 @@
"""
CRUD operations for users
"""
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
"""
CRUD operations for User model
"""
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
"""
Get a user by email
"""
return db.query(User).filter(User.email == email).first()
def get_by_username(self, db: Session, *, username: str) -> Optional[User]:
"""
Get a user by username
"""
return db.query(User).filter(User.username == username).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
"""
Create a new user with hashed password
"""
db_obj = User(
email=obj_in.email,
username=obj_in.username,
hashed_password=get_password_hash(obj_in.password),
is_active=obj_in.is_active,
is_superuser=False,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
"""
Update a user, handling password hashing if needed
"""
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
"""
Authenticate a user with email and password
"""
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
"""
Check if user is active
"""
return user.is_active
def is_superuser(self, user: User) -> bool:
"""
Check if user is superuser
"""
return user.is_superuser
user = CRUDUser(User)

0
app/db/__init__.py Normal file
View File

38
app/db/database.py Normal file
View File

@ -0,0 +1,38 @@
"""
Database configuration module
"""
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Define the database directory
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
# SQLite database URL
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
# Create SQLAlchemy engine
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # SQLite specific
)
# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create Base class
Base = declarative_base()
# Database dependency
def get_db():
"""
Dependency for database session
"""
db = SessionLocal()
try:
yield db
finally:
db.close()

5
app/models/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""
Import models for easier access
"""
from app.models.task import Task, TaskPriority, TaskStatus
from app.models.user import User

56
app/models/task.py Normal file
View File

@ -0,0 +1,56 @@
"""
Task model for the database
"""
import enum
from datetime import datetime
from sqlalchemy import (
Boolean,
Column,
DateTime,
Enum,
ForeignKey,
Integer,
String,
Text,
)
from sqlalchemy.orm import relationship
from app.db.database import Base
class TaskPriority(str, enum.Enum):
"""
Task priority levels
"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class TaskStatus(str, enum.Enum):
"""
Task status options
"""
TODO = "todo"
IN_PROGRESS = "in_progress"
DONE = "done"
class Task(Base):
"""
Task model representing tasks in the system
"""
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
status = Column(Enum(TaskStatus), default=TaskStatus.TODO)
priority = Column(Enum(TaskPriority), default=TaskPriority.MEDIUM)
owner_id = Column(Integer, ForeignKey("users.id"))
due_date = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
is_deleted = Column(Boolean, default=False)
# Relationship with user (many-to-one)
owner = relationship("User", back_populates="tasks")

24
app/models/user.py Normal file
View File

@ -0,0 +1,24 @@
"""
User model for the database
"""
from sqlalchemy import Boolean, Column, Integer, String
from sqlalchemy.orm import relationship
from app.db.database import Base
class User(Base):
"""
User model representing users in the system
"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
# Relationship with tasks (one-to-many)
tasks = relationship("Task", back_populates="owner")

5
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""
Import schemas for easier access
"""
from app.schemas.task import Task, TaskCreate, TaskInDB, TaskUpdate
from app.schemas.user import Token, TokenPayload, User, UserCreate, UserInDB, UserUpdate

65
app/schemas/task.py Normal file
View File

@ -0,0 +1,65 @@
"""
Task schemas for API request and response validation
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from app.models.task import TaskPriority, TaskStatus
class TaskBase(BaseModel):
"""
Base task schema with common attributes
"""
title: str = Field(..., min_length=1, max_length=255)
description: Optional[str] = None
status: Optional[TaskStatus] = TaskStatus.TODO
priority: Optional[TaskPriority] = TaskPriority.MEDIUM
due_date: Optional[datetime] = None
class TaskCreate(TaskBase):
"""
Schema for task creation
"""
pass
class TaskUpdate(BaseModel):
"""
Schema for task update
"""
title: Optional[str] = Field(None, min_length=1, max_length=255)
description: Optional[str] = None
status: Optional[TaskStatus] = None
priority: Optional[TaskPriority] = None
due_date: Optional[datetime] = None
is_deleted: Optional[bool] = None
class TaskInDBBase(TaskBase):
"""
Base schema for task in database
"""
id: int
owner_id: int
created_at: datetime
updated_at: datetime
is_deleted: bool
class Config:
"""
Pydantic config
"""
orm_mode = True
class Task(TaskInDBBase):
"""
Schema for task response
"""
pass
class TaskInDB(TaskInDBBase):
"""
Schema for task in database
"""
pass

97
app/schemas/user.py Normal file
View File

@ -0,0 +1,97 @@
"""
User schemas for API request and response validation
"""
import re
from typing import Optional
from pydantic import BaseModel, EmailStr, Field, validator
class UserBase(BaseModel):
"""
Base user schema with common attributes
"""
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
is_active: Optional[bool] = True
class UserCreate(UserBase):
"""
Schema for user creation with password
"""
password: str = Field(..., min_length=8)
@validator('password')
def password_strength(cls, v):
"""
Validate password strength
"""
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'[0-9]', v):
raise ValueError('Password must contain at least one digit')
return v
class UserUpdate(BaseModel):
"""
Schema for user update
"""
email: Optional[EmailStr] = None
username: Optional[str] = Field(None, min_length=3, max_length=50)
password: Optional[str] = Field(None, min_length=8)
is_active: Optional[bool] = None
@validator('password')
def password_strength(cls, v):
"""
Validate password strength if provided
"""
if v is None:
return v
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'[0-9]', v):
raise ValueError('Password must contain at least one digit')
return v
class UserInDBBase(UserBase):
"""
Base schema for user in database
"""
id: int
is_superuser: Optional[bool] = False
class Config:
"""
Pydantic config
"""
orm_mode = True
class User(UserInDBBase):
"""
Schema for user response
"""
pass
class UserInDB(UserInDBBase):
"""
Schema for user in database with password
"""
hashed_password: str
class Token(BaseModel):
"""
Schema for access token
"""
access_token: str
token_type: str
class TokenPayload(BaseModel):
"""
Schema for token payload
"""
sub: Optional[int] = None

72
main.py Normal file
View File

@ -0,0 +1,72 @@
"""
Task Manager API - Main application entry point
"""
from app.api.v1.api import api_router
from app.core.config import settings
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.utils import get_openapi
from fastapi.responses import JSONResponse
# Set up FastAPI application
app = FastAPI(
title="Task Manager API",
description="API for managing tasks and users",
version="0.1.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=[origin for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API router
app.include_router(api_router, prefix=settings.API_V1_STR)
# Add health check endpoint
@app.get("/health", tags=["Health"])
async def health_check():
"""
Health check endpoint to verify API is running properly
"""
return {"status": "healthy"}
# Custom OpenAPI schema
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title=app.title,
version=app.version,
description=app.description,
routes=app.routes,
)
# Custom modifications to OpenAPI schema can be added here
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Global exception handler for unhandled exceptions"""
return JSONResponse(
status_code=500,
content={"detail": "Internal server error", "type": "internal_error"}
)
if __name__ == "__main__":
import uvicorn
# Run the application with uvicorn when script is executed directly
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

18
migrations/README Normal file
View File

@ -0,0 +1,18 @@
Generic single-database configuration with SQLite.
This directory contains Alembic migrations for the database schema.
To generate a new migration:
```
alembic revision --autogenerate -m "Description of changes"
```
To apply migrations:
```
alembic upgrade head
```
To downgrade:
```
alembic downgrade -1
```

84
migrations/env.py Normal file
View File

@ -0,0 +1,84 @@
"""
Alembic environment module
"""
from logging.config import fileConfig
from alembic import context
from app.db.database import Base
# Import the database models for Alembic
from sqlalchemy import engine_from_config, pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == 'sqlite'
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite # Key configuration for SQLite
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,64 @@
"""initial migration
Revision ID: 1a1a1a1a1a1a
Revises:
Create Date: 2023-06-21 00:00:00.000000
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '1a1a1a1a1a1a'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Create users table
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
# Create tasks table with enum types
op.create_table('tasks',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=255), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('status',
sa.Enum('todo', 'in_progress', 'done', name='taskstatus'),
nullable=True),
sa.Column('priority',
sa.Enum('low', 'medium', 'high', name='taskpriority'),
nullable=True),
sa.Column('owner_id', sa.Integer(), nullable=True),
sa.Column('due_date', sa.DateTime(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('is_deleted', sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(['owner_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_tasks_id'), 'tasks', ['id'], unique=False)
def downgrade():
# Drop tasks table
op.drop_index(op.f('ix_tasks_id'), table_name='tasks')
op.drop_table('tasks')
# Drop users table
op.drop_index(op.f('ix_users_username'), table_name='users')
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')

17
pyproject.toml Normal file
View File

@ -0,0 +1,17 @@
[tool.ruff]
line-length = 88
indent-width = 4
target-version = "py310"
src = ["app"]
[tool.ruff.lint]
# Enable pycodestyle ('E'), Pyflakes ('F'), and isort ('I')
select = ["E", "F", "I"]
ignore = []
# Allow unused variables that start with an underscore.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[tool.ruff.lint.per-file-ignores]
# Ignore imported but unused in __init__.py files
"__init__.py" = ["F401"]

14
requirements.txt Normal file
View File

@ -0,0 +1,14 @@
fastapi>=0.100.0
uvicorn>=0.22.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
sqlalchemy>=2.0.0
alembic>=1.11.0
python-dotenv>=1.0.0
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
python-multipart>=0.0.6
ruff>=0.0.270
httpx>=0.24.1
pytest>=7.3.1
pytest-asyncio>=0.21.0

171
test_api.py Normal file
View File

@ -0,0 +1,171 @@
"""
Test script for the Task Manager API
"""
import os
import unittest
from fastapi.testclient import TestClient
from main import app
from app.db.database import Base, engine
# Create test client
client = TestClient(app)
class TestAPI(unittest.TestCase):
"""
Test class for API endpoints
"""
def setUp(self):
# Create tables
Base.metadata.create_all(bind=engine)
def tearDown(self):
# Drop tables
Base.metadata.drop_all(bind=engine)
def test_health_endpoint(self):
"""
Test the health endpoint
"""
response = client.get("/health")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), {"status": "healthy"})
def test_user_registration(self):
"""
Test user registration
"""
user_data = {
"email": "test@example.com",
"username": "testuser",
"password": "Password123"
}
response = client.post(
"/api/v1/auth/register",
json=user_data
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["email"], user_data["email"])
self.assertEqual(data["username"], user_data["username"])
self.assertTrue(data["is_active"])
self.assertFalse(data["is_superuser"])
def test_login(self):
"""
Test user login
"""
# First register a user
user_data = {
"email": "test@example.com",
"username": "testuser",
"password": "Password123"
}
client.post(
"/api/v1/auth/register",
json=user_data
)
# Then try to login
login_data = {
"username": user_data["email"],
"password": user_data["password"]
}
response = client.post(
"/api/v1/auth/login",
data=login_data
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertIn("access_token", data)
self.assertEqual(data["token_type"], "bearer")
def test_task_crud(self):
"""
Test task CRUD operations
"""
# Register a user
user_data = {
"email": "test@example.com",
"username": "testuser",
"password": "Password123"
}
client.post(
"/api/v1/auth/register",
json=user_data
)
# Login to get token
login_data = {
"username": user_data["email"],
"password": user_data["password"]
}
login_response = client.post(
"/api/v1/auth/login",
data=login_data
)
token = login_response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# Create a task
task_data = {
"title": "Test Task",
"description": "Test Description",
"priority": "high"
}
response = client.post(
"/api/v1/tasks",
json=task_data,
headers=headers
)
self.assertEqual(response.status_code, 200)
created_task = response.json()
self.assertEqual(created_task["title"], task_data["title"])
self.assertEqual(created_task["description"], task_data["description"])
self.assertEqual(created_task["priority"], task_data["priority"])
task_id = created_task["id"]
# Read the task
response = client.get(
f"/api/v1/tasks/{task_id}",
headers=headers
)
self.assertEqual(response.status_code, 200)
read_task = response.json()
self.assertEqual(read_task["id"], task_id)
self.assertEqual(read_task["title"], task_data["title"])
# Update the task
update_data = {
"title": "Updated Task",
"status": "in_progress"
}
response = client.put(
f"/api/v1/tasks/{task_id}",
json=update_data,
headers=headers
)
self.assertEqual(response.status_code, 200)
updated_task = response.json()
self.assertEqual(updated_task["title"], update_data["title"])
self.assertEqual(updated_task["status"], update_data["status"])
# Delete the task
response = client.delete(
f"/api/v1/tasks/{task_id}",
headers=headers
)
self.assertEqual(response.status_code, 200)
deleted_task = response.json()
self.assertEqual(deleted_task["id"], task_id)
self.assertTrue(deleted_task["is_deleted"])
# Try to get the deleted task
response = client.get(
f"/api/v1/tasks/{task_id}",
headers=headers
)
self.assertEqual(response.status_code, 404)
if __name__ == "__main__":
unittest.main()