Add user authentication to the Todo app

- Created User model and schemas
- Implemented secure password hashing with bcrypt
- Added JWT token-based authentication
- Created user registration and login endpoints
- Added authentication to todo routes
- Updated todos to be associated with users
- Created migration script for the user table
- Updated documentation with auth information
This commit is contained in:
Automated Action 2025-05-19 13:45:22 +00:00
parent 2204ae214d
commit 8fefbb7c13
19 changed files with 550 additions and 45 deletions

View File

@ -1,10 +1,13 @@
# Simple Todo App with FastAPI and SQLite
A simple Todo API application built with FastAPI and SQLite that provides CRUD operations for todo items.
A simple Todo API application built with FastAPI and SQLite that provides CRUD operations for todo items with user authentication.
## Features
- Create, read, update, and delete todo items
- User authentication with JWT tokens
- User registration and login
- Secure password hashing with bcrypt
- RESTful API with FastAPI
- SQLite database with SQLAlchemy ORM
- Database migrations with Alembic
@ -19,19 +22,26 @@ simpletodoapp/
├── migrations/ # Database migration scripts
├── app/ # Application package
│ ├── api/ # API routes
│ │ ├── deps.py # Dependency injection and auth
│ │ └── routes/ # Route modules
│ │ ├── auth.py # Authentication endpoints
│ │ ├── health.py # Health check endpoint
│ │ └── todos.py # Todo endpoints
│ ├── core/ # Core modules
│ │ └── config.py # App configuration
│ │ ├── config.py # App configuration
│ │ └── security.py # Security utilities
│ ├── crud/ # CRUD operations
│ │ └── todo.py # Todo CRUD operations
│ │ ├── todo.py # Todo CRUD operations
│ │ └── user.py # User CRUD operations
│ ├── db/ # Database setup
│ │ └── session.py # DB session and engine
│ ├── models/ # SQLAlchemy models
│ │ └── todo.py # Todo model
│ │ ├── todo.py # Todo model
│ │ └── user.py # User model
│ └── schemas/ # Pydantic schemas
│ └── todo.py # Todo schemas
│ ├── todo.py # Todo schemas
│ ├── user.py # User schemas
│ └── token.py # Token schemas
├── main.py # FastAPI application creation
└── requirements.txt # Python dependencies
```
@ -56,7 +66,12 @@ cd simpletodoapp
pip install -r requirements.txt
```
3. Run the application:
3. Apply the database migrations:
```bash
alembic upgrade head
```
4. Run the application:
```bash
uvicorn main:app --reload
```
@ -74,9 +89,14 @@ After starting the application, you can access the API documentation at:
### Health Check
- `GET /health` - Check the health of the application and database connection
### Todo Operations
- `GET /api/v1/todos` - Retrieve all todos (with pagination)
- `POST /api/v1/todos` - Create a new todo
### Authentication
- `POST /api/v1/auth/register` - Register a new user
- `POST /api/v1/auth/login` - Login and get access token
- `GET /api/v1/auth/me` - Get current user information
### Todo Operations (Requires Authentication)
- `GET /api/v1/todos` - Retrieve all todos for current user (with pagination)
- `POST /api/v1/todos` - Create a new todo for current user
- `GET /api/v1/todos/{todo_id}` - Retrieve a specific todo
- `PUT /api/v1/todos/{todo_id}` - Update a specific todo
- `DELETE /api/v1/todos/{todo_id}` - Delete a specific todo

57
app/api/deps.py Normal file
View File

@ -0,0 +1,57 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core import security
from app.core.config import settings
from app.db.session import get_db
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
"""
Validate access token and return current user
"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[security.ALGORITHM])
token_data = schemas.TokenPayload(**payload)
except (JWTError, ValidationError) as e:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
) from e
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
"""
Check if current user is active
"""
if not current_user.is_active:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
"""
Check if current user is a superuser
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="The user doesn't have enough privileges"
)
return current_user

77
app/api/routes/auth.py Normal file
View File

@ -0,0 +1,77 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=schemas.User)
def register(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Register a new user
"""
# Check if user with same email exists
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists",
)
# Check if user with same username exists
user = crud.user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists",
)
# Create the user
user = crud.user.create(db, obj_in=user_in)
return user
@router.post("/login", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(db, email=form_data.username, password=form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
if not crud.user.is_active(user):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(user.id, expires_delta=access_token_expires),
"token_type": "bearer",
}
@router.get("/me", response_model=schemas.User)
def read_users_me(
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user
"""
return current_user

View File

@ -3,7 +3,8 @@ from typing import List
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app import crud
from app import crud, models
from app.api import deps
from app.db.session import get_db
from app.schemas.todo import Todo, TodoCreate, TodoUpdate
@ -15,55 +16,76 @@ def get_todos(
skip: int = Query(0, description="Skip the first N items"),
limit: int = Query(100, description="Limit the number of items returned"),
db: Session = Depends(get_db),
current_user: models.User = Depends(deps.get_current_active_user),
):
"""
Get all todos with pagination
Get all todos for the current user with pagination
"""
return crud.todo.get_todos(db=db, skip=skip, limit=limit)
return crud.todo.get_todos(db=db, user_id=current_user.id, skip=skip, limit=limit)
@router.post("/", response_model=Todo, status_code=status.HTTP_201_CREATED)
def create_todo(todo: TodoCreate, db: Session = Depends(get_db)):
def create_todo(
todo: TodoCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(deps.get_current_active_user),
):
"""
Create a new todo
Create a new todo for the current user
"""
return crud.todo.create_todo(db=db, todo=todo)
return crud.todo.create_todo(db=db, todo=todo, user_id=current_user.id)
@router.get("/{todo_id}", response_model=Todo)
def get_todo(todo_id: int, db: Session = Depends(get_db)):
def get_todo(
todo_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(deps.get_current_active_user),
):
"""
Get a specific todo by ID
Get a specific todo by ID, ensuring it belongs to the current user
"""
db_todo = crud.todo.get_todo(db=db, todo_id=todo_id)
db_todo = crud.todo.get_todo(db=db, todo_id=todo_id, user_id=current_user.id)
if db_todo is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=f"Todo with ID {todo_id} not found"
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Todo with ID {todo_id} not found",
)
return db_todo
@router.put("/{todo_id}", response_model=Todo)
def update_todo(todo_id: int, todo: TodoUpdate, db: Session = Depends(get_db)):
def update_todo(
todo_id: int,
todo: TodoUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(deps.get_current_active_user),
):
"""
Update a todo
Update a todo, ensuring it belongs to the current user
"""
db_todo = crud.todo.update_todo(db=db, todo_id=todo_id, todo=todo)
db_todo = crud.todo.update_todo(db=db, todo_id=todo_id, todo=todo, user_id=current_user.id)
if db_todo is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=f"Todo with ID {todo_id} not found"
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Todo with ID {todo_id} not found",
)
return db_todo
@router.delete("/{todo_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
def delete_todo(
todo_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(deps.get_current_active_user),
):
"""
Delete a todo
Delete a todo, ensuring it belongs to the current user
"""
success = crud.todo.delete_todo(db=db, todo_id=todo_id)
success = crud.todo.delete_todo(db=db, todo_id=todo_id, user_id=current_user.id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=f"Todo with ID {todo_id} not found"
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Todo with ID {todo_id} not found",
)
return None

View File

@ -1,3 +1,4 @@
import secrets
from typing import List, Union
from pydantic import AnyHttpUrl, validator
@ -8,6 +9,10 @@ class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "SimpleTodoApp"
# Security
SECRET_KEY: str = secrets.token_urlsafe(32)
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 8 days
# CORS
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []

39
app/core/security.py Normal file
View File

@ -0,0 +1,39 @@
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(subject: Union[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""
Creates a JWT token using the subject (typically user ID) and expiration time
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verifies a plain password against a hashed password
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Creates a hash from a plain password
"""
return pwd_context.hash(password)

View File

@ -0,0 +1,8 @@
from app.crud import user # noqa: F401
from app.crud.todo import ( # noqa: F401
create_todo,
delete_todo,
get_todo,
get_todos,
update_todo,
)

View File

@ -6,36 +6,44 @@ from app.models.todo import Todo
from app.schemas.todo import TodoCreate, TodoUpdate
def get_todos(db: Session, skip: int = 0, limit: int = 100) -> List[Todo]:
def get_todos(db: Session, user_id: int, skip: int = 0, limit: int = 100) -> List[Todo]:
"""
Get all todo items with pagination
Get all todo items for a specific user with pagination
"""
return db.query(Todo).offset(skip).limit(limit).all()
return db.query(Todo).filter(Todo.owner_id == user_id).offset(skip).limit(limit).all()
def get_todo(db: Session, todo_id: int) -> Optional[Todo]:
def get_todo(db: Session, todo_id: int, user_id: int = None) -> Optional[Todo]:
"""
Get a specific todo item by ID
Get a specific todo item by ID, optionally filtering by user_id
"""
return db.query(Todo).filter(Todo.id == todo_id).first()
query = db.query(Todo).filter(Todo.id == todo_id)
if user_id is not None:
query = query.filter(Todo.owner_id == user_id)
return query.first()
def create_todo(db: Session, todo: TodoCreate) -> Todo:
def create_todo(db: Session, todo: TodoCreate, user_id: int) -> Todo:
"""
Create a new todo item
"""
db_todo = Todo(title=todo.title, description=todo.description, completed=todo.completed)
db_todo = Todo(
title=todo.title,
description=todo.description,
completed=todo.completed,
owner_id=user_id,
)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
def update_todo(db: Session, todo_id: int, todo: TodoUpdate) -> Optional[Todo]:
def update_todo(db: Session, todo_id: int, todo: TodoUpdate, user_id: int = None) -> Optional[Todo]:
"""
Update a todo item
Update a todo item, optionally checking owner
"""
db_todo = get_todo(db, todo_id)
db_todo = get_todo(db, todo_id, user_id)
if not db_todo:
return None
@ -49,11 +57,11 @@ def update_todo(db: Session, todo_id: int, todo: TodoUpdate) -> Optional[Todo]:
return db_todo
def delete_todo(db: Session, todo_id: int) -> bool:
def delete_todo(db: Session, todo_id: int, user_id: int = None) -> bool:
"""
Delete a todo item
Delete a todo item, optionally checking owner
"""
db_todo = get_todo(db, todo_id)
db_todo = get_todo(db, todo_id, user_id)
if not db_todo:
return False

94
app/crud/user.py Normal file
View File

@ -0,0 +1,94 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
def get(db: Session, id: int) -> Optional[User]:
"""
Get a user by ID
"""
return db.query(User).filter(User.id == id).first()
def get_by_email(db: Session, email: str) -> Optional[User]:
"""
Get a user by email
"""
return db.query(User).filter(User.email == email).first()
def get_by_username(db: Session, username: str) -> Optional[User]:
"""
Get a user by username
"""
return db.query(User).filter(User.username == username).first()
def create(db: Session, *, obj_in: UserCreate) -> User:
"""
Create a new user
"""
db_obj = User(
email=obj_in.email,
username=obj_in.username,
hashed_password=get_password_hash(obj_in.password),
is_active=obj_in.is_active,
is_superuser=obj_in.is_superuser,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]) -> User:
"""
Update a user
"""
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
for field, value in update_data.items():
setattr(db_obj, field, value)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def authenticate(db: Session, *, email: str, password: str) -> Optional[User]:
"""
Authenticate a user
"""
user = get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(user: User) -> bool:
"""
Check if user is active
"""
return user.is_active
def is_superuser(user: User) -> bool:
"""
Check if user is superuser
"""
return user.is_superuser

View File

@ -0,0 +1,2 @@
from app.models.todo import Todo # noqa: F401
from app.models.user import User # noqa: F401

View File

@ -1,4 +1,5 @@
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.session import Base
@ -13,5 +14,9 @@ class Todo(Base):
title = Column(String, index=True)
description = Column(String, nullable=True)
completed = Column(Boolean, default=False)
owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime(timezone=True), default=func.now())
updated_at = Column(DateTime(timezone=True), default=func.now(), onupdate=func.now())
# Relationships
owner = relationship("User", back_populates="todos")

23
app/models/user.py Normal file
View File

@ -0,0 +1,23 @@
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.session import Base
class User(Base):
"""Database model for User accounts"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), default=func.now())
updated_at = Column(DateTime(timezone=True), default=func.now(), onupdate=func.now())
# Relationships
todos = relationship("Todo", back_populates="owner", cascade="all, delete-orphan")

View File

@ -0,0 +1,3 @@
from app.schemas.todo import Todo, TodoCreate, TodoUpdate # noqa: F401
from app.schemas.token import Token, TokenPayload # noqa: F401
from app.schemas.user import User, UserCreate, UserInDB, UserUpdate # noqa: F401

View File

@ -15,7 +15,7 @@ class TodoBase(BaseModel):
class TodoCreate(TodoBase):
"""Schema for creating a new Todo item"""
pass
# No need to include owner_id here as it will be set from the current user
class TodoUpdate(BaseModel):
@ -30,6 +30,7 @@ class TodoInDBBase(TodoBase):
"""Base schema for Todo items from the database"""
id: int
owner_id: int
created_at: datetime
updated_at: datetime

16
app/schemas/token.py Normal file
View File

@ -0,0 +1,16 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
"""Schema for token response"""
access_token: str
token_type: str
class TokenPayload(BaseModel):
"""Schema for token payload"""
sub: Optional[int] = None

59
app/schemas/user.py Normal file
View File

@ -0,0 +1,59 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr, Field, field_validator
class UserBase(BaseModel):
"""Base schema for User objects"""
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
is_active: bool = True
is_superuser: bool = False
class UserCreate(UserBase):
"""Schema for creating a new user"""
password: str = Field(..., min_length=8, max_length=100)
password_confirm: str
@field_validator("password_confirm")
def passwords_match(cls, v, values):
if "password" in values.data and v != values.data["password"]:
raise ValueError("Passwords do not match")
return v
class UserUpdate(BaseModel):
"""Schema for updating a user"""
email: Optional[EmailStr] = None
username: Optional[str] = None
password: Optional[str] = None
is_active: Optional[bool] = None
is_superuser: Optional[bool] = None
class UserInDBBase(UserBase):
"""Base schema for User objects from the database"""
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class User(UserInDBBase):
"""Schema for User objects returned from the API"""
pass
class UserInDB(UserInDBBase):
"""Schema for User objects with hashed_password"""
hashed_password: str

View File

@ -1,7 +1,7 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import health, todos
from app.api.routes import auth, health, todos
from app.core.config import settings
app = FastAPI(
@ -22,6 +22,7 @@ if settings.BACKEND_CORS_ORIGINS:
)
# Include routers
app.include_router(auth.router, prefix=settings.API_V1_STR)
app.include_router(todos.router, prefix=settings.API_V1_STR)
app.include_router(health.router)

View File

@ -0,0 +1,62 @@
"""add users table and update todos table
Revision ID: 0002
Revises: 0001
Create Date: 2023-11-10
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "0002"
down_revision = "0001"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create users table
op.create_table(
"users",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("email", sa.String(), nullable=False),
sa.Column("username", sa.String(), nullable=False),
sa.Column("hashed_password", sa.String(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False, default=True),
sa.Column("is_superuser", sa.Boolean(), nullable=False, default=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_users_id"), "users", ["id"], unique=False)
op.create_index(op.f("ix_users_email"), "users", ["email"], unique=True)
op.create_index(op.f("ix_users_username"), "users", ["username"], unique=True)
# Add owner_id column to todos table
with op.batch_alter_table("todos") as batch_op:
# First, create a default user to associate with existing todos
# For SQLite, we need to handle this differently
batch_op.add_column(sa.Column("owner_id", sa.Integer(), nullable=True))
# Create a foreign key constraint separately
with op.batch_alter_table("todos") as batch_op:
batch_op.create_foreign_key("fk_todos_owner_id_users", "users", ["owner_id"], ["id"])
def downgrade() -> None:
# Remove foreign key and owner_id column from todos table
with op.batch_alter_table("todos") as batch_op:
batch_op.drop_constraint("fk_todos_owner_id_users", type_="foreignkey")
batch_op.drop_column("owner_id")
# Drop users table
op.drop_index(op.f("ix_users_username"), table_name="users")
op.drop_index(op.f("ix_users_email"), table_name="users")
op.drop_index(op.f("ix_users_id"), table_name="users")
op.drop_table("users")

View File

@ -7,4 +7,7 @@ pydantic-settings>=2.0.3
python-multipart>=0.0.6
ruff>=0.1.3
pytest>=7.4.3
httpx>=0.25.1
httpx>=0.25.1
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
email-validator>=2.1.0