Implement Blogging Platform API with FastAPI and SQLite

- Set up project structure and dependencies
- Create database models for users, posts, comments, and tags
- Set up Alembic for database migrations
- Implement user authentication (register, login)
- Create CRUD endpoints for blog posts, comments, and tags
- Add health check endpoint
- Set up proper error handling
- Update README with project details and setup instructions
This commit is contained in:
Automated Action 2025-06-02 21:58:50 +00:00
parent 9e2b52681a
commit 06df0285b1
46 changed files with 1796 additions and 2 deletions

170
README.md
View File

@ -1,3 +1,169 @@
# FastAPI Application
# Blogging Platform API
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A RESTful API for a blogging platform built with FastAPI and SQLite.
## Features
- User authentication (register, login)
- Blog post management (CRUD operations)
- Comment system
- Tag/category support for blog posts
- Health check endpoint
- Proper error handling
- API documentation with Swagger and ReDoc
## Tech Stack
- **FastAPI**: Modern, fast web framework for building APIs with Python
- **SQLAlchemy**: SQL toolkit and Object-Relational Mapping (ORM)
- **SQLite**: Lightweight, serverless database
- **Alembic**: Database migration tool
- **Pydantic**: Data validation and settings management
- **JWT**: JSON Web Tokens for authentication
- **Uvicorn**: ASGI server
## Project Structure
```
.
├── alembic.ini
├── app
│ ├── api
│ │ ├── deps.py
│ │ ├── v1
│ │ │ ├── api.py
│ │ │ └── endpoints
│ │ │ ├── auth.py
│ │ │ ├── comments.py
│ │ │ ├── posts.py
│ │ │ ├── tags.py
│ │ │ └── users.py
│ ├── core
│ │ ├── config.py
│ │ └── security.py
│ ├── crud
│ │ ├── base.py
│ │ ├── crud_comment.py
│ │ ├── crud_post.py
│ │ ├── crud_tag.py
│ │ └── crud_user.py
│ ├── db
│ │ ├── base.py
│ │ ├── deps.py
│ │ └── session.py
│ ├── models
│ │ ├── comment.py
│ │ ├── post.py
│ │ ├── tag.py
│ │ └── user.py
│ ├── schemas
│ │ ├── comment.py
│ │ ├── post.py
│ │ ├── tag.py
│ │ ├── token.py
│ │ └── user.py
│ └── utils
│ └── errors.py
├── main.py
├── migrations
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ └── 0001_initial_migration.py
├── requirements.txt
└── storage
└── db
```
## Setup Instructions
### Prerequisites
- Python 3.8 or higher
- pip (Python package installer)
### Installation
1. Clone the repository:
```bash
git clone <repository-url>
cd blogging-platform-api
```
2. Create a virtual environment (optional but recommended):
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Set up environment variables (optional):
Create a `.env` file in the root directory with the following variables:
```
SECRET_KEY=your-secret-key
ACCESS_TOKEN_EXPIRE_MINUTES=10080 # 7 days
```
5. Run database migrations:
```bash
alembic upgrade head
```
### Running the Application
Start the FastAPI server:
```bash
uvicorn main:app --reload
```
The API will be available at `http://localhost:8000`
## API Documentation
- Swagger UI: `http://localhost:8000/docs`
- ReDoc: `http://localhost:8000/redoc`
- OpenAPI JSON: `http://localhost:8000/openapi.json`
## API Endpoints
### Authentication
- `POST /api/v1/auth/register`: Register a new user
- `POST /api/v1/auth/login`: Login and get access token
### Users
- `GET /api/v1/users/me`: Get current user
- `PUT /api/v1/users/me`: Update current user
### Blog Posts
- `GET /api/v1/posts`: List posts
- `POST /api/v1/posts`: Create a new post
- `GET /api/v1/posts/{id}`: Get a post by ID
- `PUT /api/v1/posts/{id}`: Update a post
- `DELETE /api/v1/posts/{id}`: Delete a post
### Comments
- `GET /api/v1/comments`: List comments
- `POST /api/v1/comments`: Create a new comment
- `GET /api/v1/comments/{id}`: Get a comment by ID
- `PUT /api/v1/comments/{id}`: Update a comment
- `DELETE /api/v1/comments/{id}`: Delete a comment
### Tags
- `GET /api/v1/tags`: List tags
- `POST /api/v1/tags`: Create a new tag
- `GET /api/v1/tags/{id}`: Get a tag by ID
- `PUT /api/v1/tags/{id}`: Update a tag
- `DELETE /api/v1/tags/{id}`: Delete a tag
### Health Check
- `GET /health`: Check if the API is up and running

85
alembic.ini Normal file
View File

@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

10
app/api/deps.py Normal file
View File

@ -0,0 +1,10 @@
from app.db.deps import get_db as db_get_db
from app.db.deps import get_current_user as db_get_current_user
from app.db.deps import get_current_active_user as db_get_current_active_user
# Re-export dependencies from app.db.deps
get_db = db_get_db
get_current_user = db_get_current_user
get_current_active_user = db_get_current_active_user

0
app/api/v1/__init__.py Normal file
View File

11
app/api/v1/api.py Normal file
View File

@ -0,0 +1,11 @@
from fastapi import APIRouter
from app.api.v1.endpoints import auth, users, posts, comments, tags
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(posts.router, prefix="/posts", tags=["posts"])
api_router.include_router(comments.router, prefix="/comments", tags=["comments"])
api_router.include_router(tags.router, prefix="/tags", tags=["tags"])

View File

View File

@ -0,0 +1,73 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud
from app.api import deps
from app.core import security
from app.core.config import settings
from app.schemas.token import Token
from app.schemas.user import User, UserCreate
router = APIRouter()
@router.post("/register", response_model=User)
def register(
*,
db: Session = Depends(deps.get_db),
user_in: UserCreate,
) -> Any:
"""
Register a new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists",
)
user = crud.user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists",
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.post("/login", response_model=Token)
def login_access_token(
db: Session = Depends(deps.get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests.
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
if not crud.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}

View File

@ -0,0 +1,120 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app import crud
from app.api import deps
from app.models.user import User
from app.schemas.comment import Comment, CommentCreate, CommentUpdate
router = APIRouter()
@router.get("/", response_model=List[Comment])
def read_comments(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
post_id: int = Query(None, description="Filter comments by post_id"),
) -> Any:
"""
Retrieve comments.
"""
if post_id:
comments = crud.comment.get_multi_by_post(
db, post_id=post_id, skip=skip, limit=limit
)
else:
comments = crud.comment.get_multi_with_author(db, skip=skip, limit=limit)
return comments
@router.post("/", response_model=Comment)
def create_comment(
*,
db: Session = Depends(deps.get_db),
comment_in: CommentCreate,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new comment.
"""
# Check if post exists
post = crud.post.get(db=db, id=comment_in.post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Post not found",
)
comment = crud.comment.create_with_author(
db=db, obj_in=comment_in, author_id=current_user.id
)
return crud.comment.get_with_author(db=db, id=comment.id)
@router.get("/{id}", response_model=Comment)
def read_comment(
*,
db: Session = Depends(deps.get_db),
id: int,
) -> Any:
"""
Get comment by ID.
"""
comment = crud.comment.get_with_author(db=db, id=id)
if not comment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Comment not found"
)
return comment
@router.put("/{id}", response_model=Comment)
def update_comment(
*,
db: Session = Depends(deps.get_db),
id: int,
comment_in: CommentUpdate,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a comment.
"""
comment = crud.comment.get(db=db, id=id)
if not comment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Comment not found"
)
if comment.author_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to update this comment",
)
comment = crud.comment.update(db=db, db_obj=comment, obj_in=comment_in)
return crud.comment.get_with_author(db=db, id=comment.id)
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_comment(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a comment.
"""
comment = crud.comment.get(db=db, id=id)
if not comment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Comment not found"
)
if comment.author_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this comment",
)
crud.comment.remove(db=db, id=id)
return None

View File

@ -0,0 +1,106 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud
from app.api import deps
from app.models.user import User
from app.schemas.post import Post, PostCreate, PostUpdate
router = APIRouter()
@router.get("/", response_model=List[Post])
def read_posts(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
author_id: Optional[int] = None,
) -> Any:
"""
Retrieve posts.
"""
if author_id:
posts = crud.post.get_multi_by_author(
db, author_id=author_id, skip=skip, limit=limit
)
else:
posts = crud.post.get_multi_with_details(db, skip=skip, limit=limit)
return posts
@router.post("/", response_model=Post)
def create_post(
*,
db: Session = Depends(deps.get_db),
post_in: PostCreate,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new post.
"""
post = crud.post.create_with_author(
db=db, obj_in=post_in, author_id=current_user.id
)
return crud.post.get_with_details(db=db, id=post.id)
@router.get("/{id}", response_model=Post)
def read_post(
*,
db: Session = Depends(deps.get_db),
id: int,
) -> Any:
"""
Get post by ID.
"""
post = crud.post.get_with_details(db=db, id=id)
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Post not found")
return post
@router.put("/{id}", response_model=Post)
def update_post(
*,
db: Session = Depends(deps.get_db),
id: int,
post_in: PostUpdate,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a post.
"""
post = crud.post.get(db=db, id=id)
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Post not found")
if post.author_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to update this post",
)
post = crud.post.update_with_tags(db=db, db_obj=post, obj_in=post_in)
return crud.post.get_with_details(db=db, id=post.id)
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_post(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a post.
"""
post = crud.post.get(db=db, id=id)
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Post not found")
if post.author_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this post",
)
crud.post.remove(db=db, id=id)
return None

View File

@ -0,0 +1,112 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud
from app.api import deps
from app.models.user import User
from app.schemas.tag import Tag, TagCreate, TagUpdate
router = APIRouter()
@router.get("/", response_model=List[Tag])
def read_tags(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
) -> Any:
"""
Retrieve tags.
"""
tags = crud.tag.get_multi(db, skip=skip, limit=limit)
return tags
@router.post("/", response_model=Tag)
def create_tag(
*,
db: Session = Depends(deps.get_db),
tag_in: TagCreate,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new tag.
"""
# Check if tag already exists
tag = crud.tag.get_by_name(db=db, name=tag_in.name)
if tag:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Tag with this name already exists",
)
tag = crud.tag.create(db=db, obj_in=tag_in)
return tag
@router.get("/{id}", response_model=Tag)
def read_tag(
*,
db: Session = Depends(deps.get_db),
id: int,
) -> Any:
"""
Get tag by ID.
"""
tag = crud.tag.get(db=db, id=id)
if not tag:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Tag not found"
)
return tag
@router.put("/{id}", response_model=Tag)
def update_tag(
*,
db: Session = Depends(deps.get_db),
id: int,
tag_in: TagUpdate,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a tag.
"""
tag = crud.tag.get(db=db, id=id)
if not tag:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Tag not found"
)
# Check if new tag name already exists
if tag_in.name and tag_in.name != tag.name:
existing_tag = crud.tag.get_by_name(db=db, name=tag_in.name)
if existing_tag:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Tag with this name already exists",
)
tag = crud.tag.update(db=db, db_obj=tag, obj_in=tag_in)
return tag
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_tag(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a tag.
"""
tag = crud.tag.get(db=db, id=id)
if not tag:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Tag not found"
)
crud.tag.remove(db=db, id=id)
return None

View File

@ -0,0 +1,51 @@
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud
from app.api import deps
from app.models.user import User as UserModel
from app.schemas.user import User, UserUpdate
router = APIRouter()
@router.get("/me", response_model=User)
def get_current_user(
current_user: UserModel = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.put("/me", response_model=User)
def update_current_user(
*,
db: Session = Depends(deps.get_db),
user_in: UserUpdate,
current_user: UserModel = Depends(deps.get_current_active_user),
) -> Any:
"""
Update current user.
"""
if user_in.username and user_in.username != current_user.username:
user = crud.user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered",
)
if user_in.email and user_in.email != current_user.email:
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user

0
app/core/__init__.py Normal file
View File

32
app/core/config.py Normal file
View File

@ -0,0 +1,32 @@
from typing import List
from pydantic_settings import BaseSettings
from pydantic import AnyHttpUrl, validator
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Blogging Platform API"
# CORS Origins
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
@validator("BACKEND_CORS_ORIGINS", pre=True)
def assemble_cors_origins(cls, v: str | List[str]) -> List[str] | str:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, (list, str)):
return v
raise ValueError(v)
# JWT Secret Key
SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM: str = "HS256"
# 60 minutes * 24 hours * 7 days = 7 days
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7
class Config:
case_sensitive = True
env_file = ".env"
settings = Settings()

44
app/core/security.py Normal file
View File

@ -0,0 +1,44 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
"""
Create a JWT token with the given subject.
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a password against a hash.
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Generate a password hash.
"""
return pwd_context.hash(password)

6
app/crud/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from app.crud.crud_user import user
from app.crud.crud_post import post
from app.crud.crud_comment import comment
from app.crud.crud_tag import tag
__all__ = ["user", "post", "comment", "tag"]

66
app/crud/base.py Normal file
View File

@ -0,0 +1,66 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.session import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

63
app/crud/crud_comment.py Normal file
View File

@ -0,0 +1,63 @@
from typing import List, Optional
from sqlalchemy.orm import Session, joinedload
from app.crud.base import CRUDBase
from app.models.comment import Comment
from app.schemas.comment import CommentCreate, CommentUpdate
class CRUDComment(CRUDBase[Comment, CommentCreate, CommentUpdate]):
def create_with_author(
self, db: Session, *, obj_in: CommentCreate, author_id: int
) -> Comment:
obj_in_data = obj_in.model_dump()
db_obj = Comment(**obj_in_data, author_id=author_id)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_multi_by_author(
self, db: Session, *, author_id: int, skip: int = 0, limit: int = 100
) -> List[Comment]:
return (
db.query(Comment)
.filter(Comment.author_id == author_id)
.offset(skip)
.limit(limit)
.all()
)
def get_multi_by_post(
self, db: Session, *, post_id: int, skip: int = 0, limit: int = 100
) -> List[Comment]:
return (
db.query(Comment)
.filter(Comment.post_id == post_id)
.offset(skip)
.limit(limit)
.all()
)
def get_with_author(self, db: Session, *, id: int) -> Optional[Comment]:
return (
db.query(Comment)
.options(joinedload(Comment.author))
.filter(Comment.id == id)
.first()
)
def get_multi_with_author(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[Comment]:
return (
db.query(Comment)
.options(joinedload(Comment.author))
.offset(skip)
.limit(limit)
.all()
)
comment = CRUDComment(Comment)

80
app/crud/crud_post.py Normal file
View File

@ -0,0 +1,80 @@
from typing import List, Optional
from sqlalchemy.orm import Session, joinedload
from app.crud.base import CRUDBase
from app.models.post import Post
from app.models.tag import Tag
from app.schemas.post import PostCreate, PostUpdate
class CRUDPost(CRUDBase[Post, PostCreate, PostUpdate]):
def create_with_author(
self, db: Session, *, obj_in: PostCreate, author_id: int
) -> Post:
obj_in_data = obj_in.model_dump()
tag_ids = obj_in_data.pop("tag_ids", [])
db_obj = Post(**obj_in_data, author_id=author_id)
# Add tags if provided
if tag_ids:
tags = db.query(Tag).filter(Tag.id.in_(tag_ids)).all()
db_obj.tags = tags
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_multi_by_author(
self, db: Session, *, author_id: int, skip: int = 0, limit: int = 100
) -> List[Post]:
return (
db.query(Post)
.filter(Post.author_id == author_id)
.offset(skip)
.limit(limit)
.all()
)
def get_with_details(self, db: Session, *, id: int) -> Optional[Post]:
return (
db.query(Post)
.options(joinedload(Post.author), joinedload(Post.tags))
.filter(Post.id == id)
.first()
)
def get_multi_with_details(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[Post]:
return (
db.query(Post)
.options(joinedload(Post.author), joinedload(Post.tags))
.offset(skip)
.limit(limit)
.all()
)
def update_with_tags(
self, db: Session, *, db_obj: Post, obj_in: PostUpdate
) -> Post:
obj_in_data = obj_in.model_dump(exclude_unset=True)
tag_ids = obj_in_data.pop("tag_ids", None)
# Update basic fields
for field in obj_in_data:
setattr(db_obj, field, obj_in_data[field])
# Update tags if provided
if tag_ids is not None:
tags = db.query(Tag).filter(Tag.id.in_(tag_ids)).all()
db_obj.tags = tags
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
post = CRUDPost(Post)

20
app/crud/crud_tag.py Normal file
View File

@ -0,0 +1,20 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.tag import Tag
from app.schemas.tag import TagCreate, TagUpdate
class CRUDTag(CRUDBase[Tag, TagCreate, TagUpdate]):
def get_by_name(self, db: Session, *, name: str) -> Optional[Tag]:
return db.query(Tag).filter(Tag.name == name).first()
def get_multi_by_names(
self, db: Session, *, names: List[str]
) -> List[Tag]:
return db.query(Tag).filter(Tag.name.in_(names)).all()
tag = CRUDTag(Tag)

55
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,55 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def get_by_username(self, db: Session, *, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
username=obj_in.username,
hashed_password=get_password_hash(obj_in.password),
is_active=obj_in.is_active,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
user = CRUDUser(User)

0
app/db/__init__.py Normal file
View File

2
app/db/base.py Normal file
View File

@ -0,0 +1,2 @@
# Import all the models, so that Base has them before being
# imported by Alembic

65
app/db/deps.py Normal file
View File

@ -0,0 +1,65 @@
from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import SessionLocal
from app.models.user import User
from app.schemas.token import TokenPayload
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
def get_db() -> Generator:
"""
Dependency for getting a database session.
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
"""
Validate the access token and return the current user.
"""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = db.query(User).filter(User.id == token_data.sub).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
return user
def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Check if the current user is active.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
return current_user

18
app/db/session.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pathlib import Path
# Create the directory for the database if it doesn't exist
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

0
app/models/__init__.py Normal file
View File

20
app/models/comment.py Normal file
View File

@ -0,0 +1,20 @@
from datetime import datetime
from sqlalchemy import Column, Integer, Text, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from app.db.session import Base
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
content = Column(Text, nullable=False)
author_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
post_id = Column(Integer, ForeignKey("posts.id", ondelete="CASCADE"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
author = relationship("User", back_populates="comments")
post = relationship("Post", back_populates="comments")

23
app/models/post.py Normal file
View File

@ -0,0 +1,23 @@
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from app.db.session import Base
from app.models.tag import post_tag
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(255), nullable=False)
content = Column(Text, nullable=False)
published = Column(Boolean, default=True)
author_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
author = relationship("User", back_populates="posts")
comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
tags = relationship("Tag", secondary=post_tag, back_populates="posts")

22
app/models/tag.py Normal file
View File

@ -0,0 +1,22 @@
from sqlalchemy import Column, Integer, String, Table, ForeignKey
from sqlalchemy.orm import relationship
from app.db.session import Base
# Association table for the many-to-many relationship between posts and tags
post_tag = Table(
"post_tag",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)
class Tag(Base):
__tablename__ = "tags"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), unique=True, index=True, nullable=False)
# Relationships
posts = relationship("Post", secondary=post_tag, back_populates="tags")

21
app/models/user.py Normal file
View File

@ -0,0 +1,21 @@
from datetime import datetime
from sqlalchemy import Boolean, Column, Integer, String, DateTime
from sqlalchemy.orm import relationship
from app.db.session import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan")

0
app/schemas/__init__.py Normal file
View File

45
app/schemas/comment.py Normal file
View File

@ -0,0 +1,45 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from app.schemas.user import User
# Shared properties
class CommentBase(BaseModel):
content: Optional[str] = None
# Properties to receive via API on creation
class CommentCreate(CommentBase):
content: str
post_id: int
# Properties to receive via API on update
class CommentUpdate(CommentBase):
pass
# Properties shared by models stored in DB
class CommentInDBBase(CommentBase):
id: int
content: str
author_id: int
post_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Properties to return to client
class Comment(CommentInDBBase):
author: User
# Properties stored in DB
class CommentInDB(CommentInDBBase):
pass

50
app/schemas/post.py Normal file
View File

@ -0,0 +1,50 @@
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel
from app.schemas.user import User
from app.schemas.tag import Tag
# Shared properties
class PostBase(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
published: Optional[bool] = True
# Properties to receive via API on creation
class PostCreate(PostBase):
title: str
content: str
tag_ids: Optional[List[int]] = []
# Properties to receive via API on update
class PostUpdate(PostBase):
tag_ids: Optional[List[int]] = None
# Properties shared by models stored in DB
class PostInDBBase(PostBase):
id: int
title: str
content: str
author_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Properties to return to client
class Post(PostInDBBase):
author: User
tags: List[Tag] = []
# Properties stored in DB
class PostInDB(PostInDBBase):
pass

37
app/schemas/tag.py Normal file
View File

@ -0,0 +1,37 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
class TagBase(BaseModel):
name: Optional[str] = None
# Properties to receive via API on creation
class TagCreate(TagBase):
name: str
# Properties to receive via API on update
class TagUpdate(TagBase):
pass
# Properties shared by models stored in DB
class TagInDBBase(TagBase):
id: int
name: str
class Config:
from_attributes = True
# Properties to return to client
class Tag(TagInDBBase):
pass
# Properties stored in DB
class TagInDB(TagInDBBase):
pass

11
app/schemas/token.py Normal file
View File

@ -0,0 +1,11 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

45
app/schemas/user.py Normal file
View File

@ -0,0 +1,45 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
is_active: Optional[bool] = True
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
username: str
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
# Properties shared by models stored in DB
class UserInDBBase(UserBase):
id: int
email: EmailStr
username: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Properties to return to client
class User(UserInDBBase):
pass
# Properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

0
app/utils/__init__.py Normal file
View File

55
app/utils/errors.py Normal file
View File

@ -0,0 +1,55 @@
from fastapi import HTTPException, status
class BlogAPIError(HTTPException):
"""Base class for all API errors."""
def __init__(
self,
status_code: int,
detail: str,
headers: dict = None,
):
super().__init__(status_code=status_code, detail=detail, headers=headers)
class NotFoundError(BlogAPIError):
"""Raised when a resource is not found."""
def __init__(self, detail: str = "Resource not found"):
super().__init__(status_code=status.HTTP_404_NOT_FOUND, detail=detail)
class ForbiddenError(BlogAPIError):
"""Raised when a user is not allowed to access a resource."""
def __init__(self, detail: str = "You don't have permission to access this resource"):
super().__init__(status_code=status.HTTP_403_FORBIDDEN, detail=detail)
class UnauthorizedError(BlogAPIError):
"""Raised when a user is not authenticated."""
def __init__(self, detail: str = "Not authenticated"):
super().__init__(status_code=status.HTTP_401_UNAUTHORIZED, detail=detail)
class BadRequestError(BlogAPIError):
"""Raised when a request is malformed or invalid."""
def __init__(self, detail: str = "Bad request"):
super().__init__(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
class ConflictError(BlogAPIError):
"""Raised when a resource already exists."""
def __init__(self, detail: str = "Resource already exists"):
super().__init__(status_code=status.HTTP_409_CONFLICT, detail=detail)
class ValidationError(BlogAPIError):
"""Raised when validation fails."""
def __init__(self, detail: str = "Validation error"):
super().__init__(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=detail)

64
main.py Normal file
View File

@ -0,0 +1,64 @@
import uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from sqlalchemy.exc import SQLAlchemyError
from app.api.v1.api import api_router
from app.core.config import settings
from app.utils.errors import BlogAPIError
app = FastAPI(
title=settings.PROJECT_NAME,
description="Blogging Platform API",
version="0.1.0",
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Exception handlers
@app.exception_handler(BlogAPIError)
async def blog_api_exception_handler(request: Request, exc: BlogAPIError):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={"detail": "Validation error", "errors": exc.errors()},
)
@app.exception_handler(SQLAlchemyError)
async def sqlalchemy_exception_handler(request: Request, exc: SQLAlchemyError):
return JSONResponse(
status_code=500,
content={"detail": "Database error", "error": str(exc)},
)
# Set up CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(api_router, prefix=settings.API_V1_STR)
# Health check endpoint
@app.get("/health", tags=["health"])
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

1
migrations/__init__.py Normal file
View File

@ -0,0 +1 @@
# Init file for alembic

80
migrations/env.py Normal file
View File

@ -0,0 +1,80 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.db.base import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == 'sqlite'
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite, # Key configuration for SQLite
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,99 @@
"""Initial migration
Revision ID: 0001
Revises:
Create Date: 2023-11-01
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '0001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Create users table
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
# Create posts table
op.create_table(
'posts',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=255), nullable=False),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('published', sa.Boolean(), nullable=True),
sa.Column('author_id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['author_id'], ['users.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
)
op.create_index(op.f('ix_posts_id'), 'posts', ['id'], unique=False)
# Create tags table
op.create_table(
'tags',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.PrimaryKeyConstraint('id'),
)
op.create_index(op.f('ix_tags_id'), 'tags', ['id'], unique=False)
op.create_index(op.f('ix_tags_name'), 'tags', ['name'], unique=True)
# Create comments table
op.create_table(
'comments',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('author_id', sa.Integer(), nullable=False),
sa.Column('post_id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['author_id'], ['users.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['post_id'], ['posts.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
)
op.create_index(op.f('ix_comments_id'), 'comments', ['id'], unique=False)
# Create post_tag association table
op.create_table(
'post_tag',
sa.Column('post_id', sa.Integer(), nullable=False),
sa.Column('tag_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['post_id'], ['posts.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['tag_id'], ['tags.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('post_id', 'tag_id'),
)
def downgrade():
op.drop_table('post_tag')
op.drop_index(op.f('ix_comments_id'), table_name='comments')
op.drop_table('comments')
op.drop_index(op.f('ix_tags_name'), table_name='tags')
op.drop_index(op.f('ix_tags_id'), table_name='tags')
op.drop_table('tags')
op.drop_index(op.f('ix_posts_id'), table_name='posts')
op.drop_table('posts')
op.drop_index(op.f('ix_users_username'), table_name='users')
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')

View File

@ -0,0 +1 @@
# Init file for alembic versions

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi>=0.104.0
uvicorn>=0.23.2
sqlalchemy>=2.0.23
alembic>=1.12.1
pydantic>=2.4.2
pydantic-settings>=2.0.3
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
python-multipart>=0.0.6
email-validator>=2.1.0
ruff>=0.1.3