Implement Task Manager API with FastAPI and SQLite

- Set up project structure and dependencies
- Create database models for tasks and users with SQLAlchemy
- Configure Alembic for database migrations
- Implement authentication system with JWT tokens
- Create CRUD API endpoints for tasks and users
- Add health check endpoint
- Update README with documentation
This commit is contained in:
Automated Action 2025-06-12 18:14:56 +00:00
parent 58769e0329
commit d1c05cbd6e
38 changed files with 1261 additions and 2 deletions

133
README.md
View File

@ -1,3 +1,132 @@
# FastAPI Application
# Task Manager API
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A RESTful API for managing tasks and users built with FastAPI and SQLite.
## Features
- User authentication with JWT tokens
- User registration and management
- Task management with CRUD operations
- Task filtering and pagination
- Health check endpoint
- OpenAPI documentation
## Tech Stack
- **FastAPI**: Modern, fast web framework for building APIs
- **SQLAlchemy**: SQL toolkit and ORM
- **Alembic**: Database migration tool
- **SQLite**: Lightweight, file-based database
- **Pydantic**: Data validation and settings management
- **JWT**: JSON Web Tokens for authentication
- **Uvicorn**: ASGI server for running the application
## Project Structure
```
taskmanagerapi/
├── app/ # Application package
│ ├── api/ # API endpoints
│ │ └── v1/ # API version 1
│ │ ├── endpoints/ # API endpoint modules
│ │ └── api.py # API router
│ │
│ ├── core/ # Core modules
│ │ ├── config/ # Configuration
│ │ └── security/ # Security utilities
│ │
│ ├── crud/ # CRUD operations
│ ├── db/ # Database setup and session
│ ├── models/ # SQLAlchemy models
│ └── schemas/ # Pydantic schemas
├── migrations/ # Alembic migrations
│ └── versions/ # Migration versions
├── alembic.ini # Alembic configuration
├── main.py # Application entry point
├── requirements.txt # Project dependencies
└── README.md # Project documentation
```
## Environment Variables
The application uses the following environment variables:
| Variable | Description | Default |
|----------|-------------|---------|
| `SECRET_KEY` | Secret key for JWT token generation | Auto-generated secure token |
| `ACCESS_TOKEN_EXPIRE_MINUTES` | JWT token expiration time in minutes | 11520 (8 days) |
| `DATABASE_URL` | SQLite database URL | `sqlite:////app/storage/db/db.sqlite` |
## Getting Started
### Prerequisites
- Python 3.8+
### Installation
1. Clone the repository:
```bash
git clone <repository-url>
cd taskmanagerapi
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run database migrations:
```bash
alembic upgrade head
```
### Running the Application
Run the application with Uvicorn:
```bash
uvicorn main:app --reload
```
The API will be available at http://127.0.0.1:8000.
## API Documentation
Once the application is running, you can access:
- Interactive API documentation: http://127.0.0.1:8000/docs
- Alternative API documentation: http://127.0.0.1:8000/redoc
- OpenAPI schema: http://127.0.0.1:8000/openapi.json
## API Endpoints
### Authentication
- `POST /api/v1/auth/login` - Get access token
- `POST /api/v1/auth/register` - Register a new user
### Users
- `GET /api/v1/users/me` - Get current user
- `PUT /api/v1/users/me` - Update current user
- `GET /api/v1/users/{user_id}` - Get user by ID (admin only)
- `GET /api/v1/users/` - Get all users (admin only)
### Tasks
- `GET /api/v1/tasks/` - Get all user's tasks
- `POST /api/v1/tasks/` - Create a new task
- `GET /api/v1/tasks/{task_id}` - Get task by ID
- `PUT /api/v1/tasks/{task_id}` - Update a task
- `DELETE /api/v1/tasks/{task_id}` - Delete a task
### Health Check
- `GET /health` - Check API health status

86
alembic.ini Normal file
View File

@ -0,0 +1,86 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example with absolute path
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/api/__init__.py Normal file
View File

0
app/api/v1/__init__.py Normal file
View File

8
app/api/v1/api.py Normal file
View File

@ -0,0 +1,8 @@
from fastapi import APIRouter
from app.api.v1.endpoints import auth, tasks, users
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(tasks.router, prefix="/tasks", tags=["tasks"])

View File

View File

@ -0,0 +1,69 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, schemas
from app.core.config import settings
from app.core.security import create_access_token
from app.db.session import get_db
router = APIRouter()
@router.post("/login", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
elif not crud.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=schemas.User)
def register_user(
*,
db: Session = Depends(get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Register a new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists",
)
username_exists = crud.user.get_by_username(db, username=user_in.username)
if username_exists:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists",
)
user = crud.user.create(db, obj_in=user_in)
return user

View File

@ -0,0 +1,98 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, schemas
from app.core.security import get_current_active_user
from app.db.session import get_db
from app.models.user import User
router = APIRouter()
@router.get("/", response_model=List[schemas.Task])
async def read_tasks(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve tasks.
"""
tasks = crud.task.get_multi_by_owner(
db=db, owner_id=current_user.id, skip=skip, limit=limit
)
return tasks
@router.post("/", response_model=schemas.Task)
async def create_task(
*,
db: Session = Depends(get_db),
task_in: schemas.TaskCreate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Create new task.
"""
task = crud.task.create_with_owner(db=db, obj_in=task_in, owner_id=current_user.id)
return task
@router.get("/{task_id}", response_model=schemas.Task)
async def read_task(
*,
db: Session = Depends(get_db),
task_id: int,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get task by ID.
"""
task = crud.task.get_by_id_and_owner(db=db, task_id=task_id, owner_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
return task
@router.put("/{task_id}", response_model=schemas.Task)
async def update_task(
*,
db: Session = Depends(get_db),
task_id: int,
task_in: schemas.TaskUpdate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Update a task.
"""
task = crud.task.get_by_id_and_owner(db=db, task_id=task_id, owner_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
task = crud.task.update(db=db, db_obj=task, obj_in=task_in)
return task
@router.delete("/{task_id}", response_model=schemas.Task)
async def delete_task(
*,
db: Session = Depends(get_db),
task_id: int,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Delete a task.
"""
task = crud.task.get_by_id_and_owner(db=db, task_id=task_id, owner_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
task = crud.task.remove(db=db, id=task_id)
return task

View File

@ -0,0 +1,79 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, schemas
from app.core.security import get_current_active_user
from app.db.session import get_db
from app.models.user import User
router = APIRouter()
@router.get("/me", response_model=schemas.User)
async def read_users_me(
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.put("/me", response_model=schemas.User)
async def update_user_me(
*,
db: Session = Depends(get_db),
user_in: schemas.UserUpdate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Update own user.
"""
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=schemas.User)
async def read_user_by_id(
user_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges",
)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return user
@router.get("/", response_model=List[schemas.User])
async def read_users(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve users. Only superusers can access this endpoint.
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges",
)
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users

View File

@ -0,0 +1,3 @@
from .settings import settings
__all__ = ["settings"]

View File

@ -0,0 +1,28 @@
import os
import secrets
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Task Manager API"
DESCRIPTION: str = "API for managing tasks and users"
VERSION: str = "0.1.0"
# Security
SECRET_KEY: str = os.environ.get("SECRET_KEY", secrets.token_urlsafe(32))
# 8 days expiration
ACCESS_TOKEN_EXPIRE_MINUTES: int = int(os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 60 * 24 * 8))
# Database
SQLALCHEMY_DATABASE_URL: str = os.environ.get(
"DATABASE_URL", "sqlite:////app/storage/db/db.sqlite"
)
class Config:
case_sensitive = True
env_file = ".env"
settings = Settings()

View File

@ -0,0 +1,15 @@
from .security import (
create_access_token,
get_current_active_user,
get_current_user,
get_password_hash,
verify_password,
)
__all__ = [
"create_access_token",
"verify_password",
"get_password_hash",
"get_current_user",
"get_current_active_user"
]

View File

@ -0,0 +1,77 @@
from datetime import datetime, timedelta
from typing import Any, Union
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import get_db
from app.models.user import User
from app.schemas.token import TokenPayload
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = settings.SECRET_KEY
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
async def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, SECRET_KEY, algorithms=[ALGORITHM]
)
token_data = TokenPayload(**payload)
if token_data.sub is None:
raise credentials_exception
except JWTError as e:
raise credentials_exception from e
from app.crud import user as user_crud
user = user_crud.get(db, id=token_data.sub)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

4
app/crud/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .crud_task import task
from .crud_user import user
__all__ = ["user", "task"]

66
app/crud/base.py Normal file
View File

@ -0,0 +1,66 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

42
app/crud/crud_task.py Normal file
View File

@ -0,0 +1,42 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.task import Task
from app.schemas.task import TaskCreate, TaskUpdate
class CRUDTask(CRUDBase[Task, TaskCreate, TaskUpdate]):
def get_multi_by_owner(
self, db: Session, *, owner_id: int, skip: int = 0, limit: int = 100
) -> List[Task]:
return (
db.query(self.model)
.filter(Task.owner_id == owner_id)
.offset(skip)
.limit(limit)
.all()
)
def create_with_owner(
self, db: Session, *, obj_in: TaskCreate, owner_id: int
) -> Task:
obj_in_data = obj_in.model_dump()
db_obj = self.model(**obj_in_data, owner_id=owner_id)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_by_id_and_owner(
self, db: Session, *, task_id: int, owner_id: int
) -> Optional[Task]:
return (
db.query(self.model)
.filter(Task.id == task_id, Task.owner_id == owner_id)
.first()
)
task = CRUDTask(Task)

59
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,59 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def get_by_username(self, db: Session, *, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
username=obj_in.username,
hashed_password=get_password_hash(obj_in.password),
is_superuser=obj_in.is_superuser,
is_active=obj_in.is_active,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

0
app/db/__init__.py Normal file
View File

3
app/db/base.py Normal file
View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

14
app/db/base_class.py Normal file
View File

@ -0,0 +1,14 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, Integer
from sqlalchemy.ext.declarative import declared_attr
class Base:
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()

5
app/db/base_imports.py Normal file
View File

@ -0,0 +1,5 @@
# Import all the models, so that Base has them before being
# imported by Alembic
from app.db.base import Base # noqa
from app.models.user import User # noqa
from app.models.task import Task # noqa

26
app/db/init_db.py Normal file
View File

@ -0,0 +1,26 @@
from sqlalchemy.orm import Session
from app import crud, schemas
from app.db import base # noqa: F401
# make sure all SQL Alchemy models are imported (app.db.base) before initializing DB
# otherwise, SQL Alchemy might fail to initialize relationships properly
# for more details: https://github.com/tiangolo/full-stack-fastapi-postgresql/issues/28
def init_db(db: Session) -> None:
# Tables should be created with Alembic migrations
# But if you don't want to use migrations, create
# the tables uncommenting the next line
# Base.metadata.create_all(bind=engine)
# Create a superuser if it doesn't exist
user = crud.user.get_by_email(db, email="admin@example.com")
if not user:
user_in = schemas.UserCreate(
email="admin@example.com",
username="admin",
password="adminpassword",
is_superuser=True,
)
crud.user.create(db, obj_in=user_in)

26
app/db/session.py Normal file
View File

@ -0,0 +1,26 @@
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
# Create storage directory if it doesn't exist
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = settings.SQLALCHEMY_DATABASE_URL
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

4
app/models/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .task import Task
from .user import User
__all__ = ["User", "Task"]

33
app/models/task.py Normal file
View File

@ -0,0 +1,33 @@
import enum
from sqlalchemy import Boolean, Column, Enum, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship
from app.db.base import Base
from app.db.base_class import Base as BaseClass
class TaskStatus(str, enum.Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
DONE = "done"
class TaskPriority(str, enum.Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class Task(Base, BaseClass):
title = Column(String(100), nullable=False, index=True)
description = Column(Text, nullable=True)
status = Column(Enum(TaskStatus), default=TaskStatus.TODO, nullable=False)
priority = Column(Enum(TaskPriority), default=TaskPriority.MEDIUM, nullable=False)
is_completed = Column(Boolean, default=False)
# Foreign key to user
owner_id = Column(Integer, ForeignKey("user.id"), nullable=False)
# Relationship with user
owner = relationship("User", back_populates="tasks")

16
app/models/user.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Boolean, Column, String
from sqlalchemy.orm import relationship
from app.db.base import Base
from app.db.base_class import Base as BaseClass
class User(Base, BaseClass):
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
# Relationship with tasks
tasks = relationship("Task", back_populates="owner", cascade="all, delete-orphan")

9
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,9 @@
from .task import Task, TaskCreate, TaskInDB, TaskUpdate
from .token import Token, TokenPayload
from .user import User, UserCreate, UserInDB, UserUpdate
__all__ = [
"User", "UserCreate", "UserUpdate", "UserInDB",
"Task", "TaskCreate", "TaskUpdate", "TaskInDB",
"Token", "TokenPayload"
]

49
app/schemas/task.py Normal file
View File

@ -0,0 +1,49 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from app.models.task import TaskPriority, TaskStatus
# Shared properties
class TaskBase(BaseModel):
title: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = None
status: Optional[TaskStatus] = None
priority: Optional[TaskPriority] = None
is_completed: Optional[bool] = None
# Properties to receive on task creation
class TaskCreate(TaskBase):
title: str = Field(..., min_length=1, max_length=100)
status: TaskStatus = TaskStatus.TODO
priority: TaskPriority = TaskPriority.MEDIUM
# Properties to receive on task update
class TaskUpdate(TaskBase):
pass
# Properties shared by models stored in DB
class TaskInDBBase(TaskBase):
id: int
title: str
owner_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Properties to return to client
class Task(TaskInDBBase):
pass
# Properties stored in DB
class TaskInDB(TaskInDBBase):
pass

12
app/schemas/token.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

43
app/schemas/user.py Normal file
View File

@ -0,0 +1,43 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr, Field
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
is_active: Optional[bool] = True
is_superuser: bool = False
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
password: str = Field(..., min_length=8)
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = Field(None, min_length=8)
class UserInDBBase(UserBase):
id: Optional[int] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

45
main.py Normal file
View File

@ -0,0 +1,45 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
description=settings.DESCRIPTION,
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Set up CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API router
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/")
async def root():
"""Root endpoint returning service information"""
return {
"title": settings.PROJECT_NAME,
"docs": "/docs",
"health": "/health"
}
@app.get("/health", status_code=200)
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

0
migrations/__init__.py Normal file
View File

91
migrations/env.py Normal file
View File

@ -0,0 +1,91 @@
import os
import sys
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# Ensure app directory is in the Python path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.db.base_imports import Base # noqa
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
# Check if the dialect is SQLite to use batch mode
is_sqlite = connection.dialect.name == "sqlite"
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite, # Use batch mode for SQLite
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

View File

@ -0,0 +1,68 @@
"""init db
Revision ID: 001_init_db
Revises:
Create Date: 2023-11-14 12:00:00.000000
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '001_init_db'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Create users table
op.create_table(
'user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('email', sa.String(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_id'), 'user', ['id'], unique=False)
op.create_index(op.f('ix_user_username'), 'user', ['username'], unique=True)
# Create tasks table
op.create_table(
'task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('title', sa.String(100), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('status', sa.Enum('todo', 'in_progress', 'done', name='taskstatus'), nullable=False),
sa.Column('priority', sa.Enum('low', 'medium', 'high', name='taskpriority'), nullable=False),
sa.Column('is_completed', sa.Boolean(), nullable=True),
sa.Column('owner_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['owner_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_task_id'), 'task', ['id'], unique=False)
op.create_index(op.f('ix_task_title'), 'task', ['title'], unique=False)
def downgrade():
# Drop tasks table
op.drop_index(op.f('ix_task_title'), table_name='task')
op.drop_index(op.f('ix_task_id'), table_name='task')
op.drop_table('task')
# Drop enum types (only works in PostgreSQL, not SQLite)
# No need to drop enums in SQLite
# Drop users table
op.drop_index(op.f('ix_user_username'), table_name='user')
op.drop_index(op.f('ix_user_id'), table_name='user')
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')

16
pyproject.toml Normal file
View File

@ -0,0 +1,16 @@
[tool.ruff]
line-length = 120
target-version = "py38"
[tool.ruff.lint]
select = ["E", "F", "I", "B", "UP"]
ignore = ["B008"] # Allow function calls in argument defaults for FastAPI dependencies
[tool.ruff.lint.isort]
known-third-party = ["fastapi", "pydantic", "sqlalchemy", "alembic", "jose", "passlib"]
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"

12
requirements.txt Normal file
View File

@ -0,0 +1,12 @@
fastapi>=0.104.1
uvicorn>=0.24.0
sqlalchemy>=2.0.23
pydantic>=2.4.2
pydantic-settings>=2.0.3
alembic>=1.12.1
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
python-multipart>=0.0.6
email-validator>=2.1.0
ruff>=0.1.5
python-dotenv>=1.0.0