Implement Insurance System API with FastAPI and SQLite

This commit is contained in:
Automated Action 2025-06-09 08:10:19 +00:00
parent 31b326d287
commit 77c36f1dfc
43 changed files with 1582 additions and 2 deletions

111
README.md
View File

@ -1,3 +1,110 @@
# FastAPI Application
# Insurance System API
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A comprehensive API for managing insurance policies, customers, and claims built with FastAPI and SQLite.
## Features
- User authentication and authorization with JWT tokens
- CRUD operations for customers, policies, and claims
- Comprehensive database models with relationships
- API documentation with Swagger UI and ReDoc
- SQLite database with Alembic migrations
## Requirements
- Python 3.8+
- SQLite
## Environment Variables
The application uses the following environment variables:
- `SECRET_KEY`: JWT secret key (default: "supersecretkey")
- `ACCESS_TOKEN_EXPIRE_MINUTES`: JWT token expiration time in minutes (default: 30)
## Installation
1. Clone the repository:
```bash
git clone <repository-url>
cd insurance-system-api
```
2. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Run database migrations:
```bash
alembic upgrade head
```
## Running the Application
Start the application with:
```bash
uvicorn main:app --reload
```
The API will be available at http://localhost:8000.
- API documentation: http://localhost:8000/docs
- Alternative API documentation: http://localhost:8000/redoc
- OpenAPI schema: http://localhost:8000/openapi.json
## API Endpoints
### Authentication
- `POST /api/v1/auth/register`: Register a new user
- `POST /api/v1/auth/token`: Login and get access token
- `GET /api/v1/auth/me`: Get current user information
### Customers
- `GET /api/v1/customers`: Get all customers
- `POST /api/v1/customers`: Create a new customer
- `GET /api/v1/customers/{customer_id}`: Get a specific customer
- `PUT /api/v1/customers/{customer_id}`: Update a customer
- `DELETE /api/v1/customers/{customer_id}`: Delete a customer
### Policies
- `GET /api/v1/policies`: Get all policies
- `POST /api/v1/policies`: Create a new policy
- `GET /api/v1/policies/{policy_id}`: Get a specific policy
- `PUT /api/v1/policies/{policy_id}`: Update a policy
- `DELETE /api/v1/policies/{policy_id}`: Delete a policy
### Claims
- `GET /api/v1/claims`: Get all claims
- `POST /api/v1/claims`: Create a new claim
- `GET /api/v1/claims/{claim_id}`: Get a specific claim
- `PUT /api/v1/claims/{claim_id}`: Update a claim
- `DELETE /api/v1/claims/{claim_id}`: Delete a claim
## Database Schema
The application uses the following database models:
- `User`: Authentication and system users
- `Customer`: Insurance customers
- `Policy`: Insurance policies
- `Claim`: Insurance claims
## License
This project is licensed under the MIT License - see the LICENSE file for details.

102
alembic.ini Normal file
View File

@ -0,0 +1,102 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example - using absolute path per project requirements
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
app/__init__.py Normal file
View File

@ -0,0 +1 @@
# Insurance System API package

1
app/api/__init__.py Normal file
View File

@ -0,0 +1 @@
# API package

54
app/api/deps.py Normal file
View File

@ -0,0 +1,54 @@
from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core.config import settings
from app.db.session import SessionLocal
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/token")
def get_db() -> Generator:
try:
db = SessionLocal()
yield db
finally:
db.close()
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_active(current_user):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
return current_user

1
app/api/v1/__init__.py Normal file
View File

@ -0,0 +1 @@
# API v1 package

11
app/api/v1/api.py Normal file
View File

@ -0,0 +1,11 @@
from fastapi import APIRouter
from app.api.v1.endpoints import customers, policies, claims, auth
api_router = APIRouter()
# Include routers for different resources
api_router.include_router(auth.router, prefix="/auth", tags=["Authentication"])
api_router.include_router(customers.router, prefix="/customers", tags=["Customers"])
api_router.include_router(policies.router, prefix="/policies", tags=["Policies"])
api_router.include_router(claims.router, prefix="/claims", tags=["Claims"])

View File

@ -0,0 +1 @@
# API endpoints package

View File

@ -0,0 +1,67 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api.deps import get_current_active_user, get_db
from app.core.config import settings
from app.core.security import create_access_token
router = APIRouter()
@router.post("/token", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
elif not crud.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=schemas.User, status_code=status.HTTP_201_CREATED)
def register_user(
*,
db: Session = Depends(get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Register a new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists in the system.",
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.User)
def read_users_me(
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user

View File

@ -0,0 +1,115 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, models
from app.api.deps import get_current_active_user, get_db
from app.schemas.claim import Claim, ClaimCreate, ClaimUpdate
router = APIRouter()
@router.get("/", response_model=List[Claim])
def read_claims(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
policy_id: Optional[int] = None,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve claims.
"""
if policy_id:
claims = crud.claim.get_by_policy(
db, policy_id=policy_id, skip=skip, limit=limit
)
else:
claims = crud.claim.get_multi(db, skip=skip, limit=limit)
return claims
@router.post("/", response_model=Claim, status_code=status.HTTP_201_CREATED)
def create_claim(
*,
db: Session = Depends(get_db),
claim_in: ClaimCreate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Create new claim.
"""
# Check if policy exists
policy = crud.policy.get(db, id=claim_in.policy_id)
if not policy:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Policy not found",
)
# Check if claim number already exists
claim = crud.claim.get_by_claim_number(db, claim_number=claim_in.claim_number)
if claim:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A claim with this claim number already exists",
)
claim = crud.claim.create(db, obj_in=claim_in)
return claim
@router.get("/{claim_id}", response_model=Claim)
def read_claim(
*,
db: Session = Depends(get_db),
claim_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Get claim by ID.
"""
claim = crud.claim.get(db, id=claim_id)
if not claim:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Claim not found",
)
return claim
@router.put("/{claim_id}", response_model=Claim)
def update_claim(
*,
db: Session = Depends(get_db),
claim_id: int,
claim_in: ClaimUpdate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Update a claim.
"""
claim = crud.claim.get(db, id=claim_id)
if not claim:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Claim not found",
)
claim = crud.claim.update(db, db_obj=claim, obj_in=claim_in)
return claim
@router.delete("/{claim_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_claim(
*,
db: Session = Depends(get_db),
claim_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Delete a claim.
"""
claim = crud.claim.get(db, id=claim_id)
if not claim:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Claim not found",
)
crud.claim.remove(db, id=claim_id)
return None

View File

@ -0,0 +1,109 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, models
from app.api.deps import get_current_active_user, get_db
from app.schemas.customer import Customer, CustomerCreate, CustomerUpdate
router = APIRouter()
@router.get("/", response_model=List[Customer])
def read_customers(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve customers.
"""
customers = crud.customer.get_multi(db, skip=skip, limit=limit)
return customers
@router.post("/", response_model=Customer, status_code=status.HTTP_201_CREATED)
def create_customer(
*,
db: Session = Depends(get_db),
customer_in: CustomerCreate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Create new customer.
"""
customer = crud.customer.get_by_email(db, email=customer_in.email)
if customer:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A customer with this email already exists in the system.",
)
customer = crud.customer.create(db, obj_in=customer_in)
return customer
@router.get("/{customer_id}", response_model=Customer)
def read_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Get customer by ID.
"""
customer = crud.customer.get(db, id=customer_id)
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found",
)
return customer
@router.put("/{customer_id}", response_model=Customer)
def update_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
customer_in: CustomerUpdate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Update a customer.
"""
customer = crud.customer.get(db, id=customer_id)
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found",
)
# Check if email exists and it's not the customer's own email
if customer_in.email and customer_in.email != customer.email:
db_customer = crud.customer.get_by_email(db, email=customer_in.email)
if db_customer:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A customer with this email already exists in the system.",
)
customer = crud.customer.update(db, db_obj=customer, obj_in=customer_in)
return customer
@router.delete("/{customer_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Delete a customer.
"""
customer = crud.customer.get(db, id=customer_id)
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found",
)
crud.customer.remove(db, id=customer_id)
return None

View File

@ -0,0 +1,115 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, models
from app.api.deps import get_current_active_user, get_db
from app.schemas.policy import Policy, PolicyCreate, PolicyUpdate
router = APIRouter()
@router.get("/", response_model=List[Policy])
def read_policies(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
customer_id: Optional[int] = None,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve policies.
"""
if customer_id:
policies = crud.policy.get_by_customer(
db, customer_id=customer_id, skip=skip, limit=limit
)
else:
policies = crud.policy.get_multi(db, skip=skip, limit=limit)
return policies
@router.post("/", response_model=Policy, status_code=status.HTTP_201_CREATED)
def create_policy(
*,
db: Session = Depends(get_db),
policy_in: PolicyCreate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Create new policy.
"""
# Check if customer exists
customer = crud.customer.get(db, id=policy_in.customer_id)
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found",
)
# Check if policy number already exists
policy = crud.policy.get_by_policy_number(db, policy_number=policy_in.policy_number)
if policy:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A policy with this policy number already exists",
)
policy = crud.policy.create(db, obj_in=policy_in)
return policy
@router.get("/{policy_id}", response_model=Policy)
def read_policy(
*,
db: Session = Depends(get_db),
policy_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Get policy by ID.
"""
policy = crud.policy.get(db, id=policy_id)
if not policy:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Policy not found",
)
return policy
@router.put("/{policy_id}", response_model=Policy)
def update_policy(
*,
db: Session = Depends(get_db),
policy_id: int,
policy_in: PolicyUpdate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Update a policy.
"""
policy = crud.policy.get(db, id=policy_id)
if not policy:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Policy not found",
)
policy = crud.policy.update(db, db_obj=policy, obj_in=policy_in)
return policy
@router.delete("/{policy_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_policy(
*,
db: Session = Depends(get_db),
policy_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
"""
Delete a policy.
"""
policy = crud.policy.get(db, id=policy_id)
if not policy:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Policy not found",
)
crud.policy.remove(db, id=policy_id)
return None

1
app/auth/__init__.py Normal file
View File

@ -0,0 +1 @@
# Authentication package

1
app/core/__init__.py Normal file
View File

@ -0,0 +1 @@
# Core package

33
app/core/config.py Normal file
View File

@ -0,0 +1,33 @@
import os
from pathlib import Path
from typing import List
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# API Settings
PROJECT_NAME: str = "Insurance System API"
PROJECT_DESCRIPTION: str = "API for managing insurance policies, customers, and claims"
PROJECT_VERSION: str = "0.1.0"
API_V1_STR: str = "/api/v1"
# Database Settings
DB_DIR: Path = Path("/app") / "storage" / "db"
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
# Security Settings
SECRET_KEY: str = os.getenv("SECRET_KEY", "supersecretkey")
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS Settings
BACKEND_CORS_ORIGINS: List[str] = ["*"]
class Config:
env_file = ".env"
case_sensitive = True
# Create settings instance
settings = Settings()
# Ensure database directory exists
settings.DB_DIR.mkdir(parents=True, exist_ok=True)

28
app/core/security.py Normal file
View File

@ -0,0 +1,28 @@
from datetime import datetime, timedelta
from typing import Any, Union, Optional
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: Optional[timedelta] = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

7
app/crud/__init__.py Normal file
View File

@ -0,0 +1,7 @@
# CRUD operations package
from app.crud.crud_user import user
from app.crud.crud_customer import customer
from app.crud.crud_policy import policy
from app.crud.crud_claim import claim
__all__ = ["user", "customer", "policy", "claim"]

65
app/crud/base.py Normal file
View File

@ -0,0 +1,65 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

31
app/crud/crud_claim.py Normal file
View File

@ -0,0 +1,31 @@
from typing import Optional, List
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.claim import Claim
from app.schemas.claim import ClaimCreate, ClaimUpdate
class CRUDClaim(CRUDBase[Claim, ClaimCreate, ClaimUpdate]):
def get_by_claim_number(self, db: Session, *, claim_number: str) -> Optional[Claim]:
return db.query(Claim).filter(Claim.claim_number == claim_number).first()
def get_by_policy(self, db: Session, *, policy_id: int, skip: int = 0, limit: int = 100) -> List[Claim]:
return db.query(Claim).filter(Claim.policy_id == policy_id).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: ClaimCreate) -> Claim:
db_obj = Claim(
claim_number=obj_in.claim_number,
policy_id=obj_in.policy_id,
incident_date=obj_in.incident_date,
description=obj_in.description,
claim_amount=obj_in.claim_amount,
status=obj_in.status,
settlement_amount=obj_in.settlement_amount,
settlement_date=obj_in.settlement_date
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
claim = CRUDClaim(Claim)

27
app/crud/crud_customer.py Normal file
View File

@ -0,0 +1,27 @@
from typing import Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.customer import Customer
from app.schemas.customer import CustomerCreate, CustomerUpdate
class CRUDCustomer(CRUDBase[Customer, CustomerCreate, CustomerUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[Customer]:
return db.query(Customer).filter(Customer.email == email).first()
def create(self, db: Session, *, obj_in: CustomerCreate) -> Customer:
# Create a dict without policies (which are handled separately)
db_obj = Customer(
first_name=obj_in.first_name,
last_name=obj_in.last_name,
email=obj_in.email,
phone_number=obj_in.phone_number,
address=obj_in.address,
date_of_birth=obj_in.date_of_birth
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
customer = CRUDCustomer(Customer)

33
app/crud/crud_policy.py Normal file
View File

@ -0,0 +1,33 @@
from typing import Optional, List
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.policy import Policy
from app.schemas.policy import PolicyCreate, PolicyUpdate
class CRUDPolicy(CRUDBase[Policy, PolicyCreate, PolicyUpdate]):
def get_by_policy_number(self, db: Session, *, policy_number: str) -> Optional[Policy]:
return db.query(Policy).filter(Policy.policy_number == policy_number).first()
def get_by_customer(self, db: Session, *, customer_id: int, skip: int = 0, limit: int = 100) -> List[Policy]:
return db.query(Policy).filter(Policy.customer_id == customer_id).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: PolicyCreate) -> Policy:
db_obj = Policy(
policy_number=obj_in.policy_number,
customer_id=obj_in.customer_id,
policy_type=obj_in.policy_type,
status=obj_in.status,
start_date=obj_in.start_date,
end_date=obj_in.end_date,
premium_amount=obj_in.premium_amount,
coverage_amount=obj_in.coverage_amount,
deductible_amount=obj_in.deductible_amount,
description=obj_in.description
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
policy = CRUDPolicy(Policy)

54
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,54 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=False,
is_active=True,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

1
app/db/__init__.py Normal file
View File

@ -0,0 +1 @@
# Database package

5
app/db/base.py Normal file
View File

@ -0,0 +1,5 @@
# Import all the models to ensure they are registered with SQLAlchemy
from sqlalchemy.ext.declarative import declarative_base
# Create Base class for SQLAlchemy models
Base = declarative_base()

21
app/db/session.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
# Create SQLAlchemy engine
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # Only needed for SQLite
)
# Create sessionmaker
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Dependency to get DB session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

7
app/models/__init__.py Normal file
View File

@ -0,0 +1,7 @@
# Database models package
from app.models.user import User
from app.models.customer import Customer
from app.models.policy import Policy, PolicyType, PolicyStatus
from app.models.claim import Claim, ClaimStatus
__all__ = ["User", "Customer", "Policy", "PolicyType", "PolicyStatus", "Claim", "ClaimStatus"]

33
app/models/claim.py Normal file
View File

@ -0,0 +1,33 @@
from sqlalchemy import Column, Integer, String, DateTime, Float, ForeignKey, Enum
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import enum
from app.db.base import Base
class ClaimStatus(str, enum.Enum):
SUBMITTED = "submitted"
UNDER_REVIEW = "under_review"
APPROVED = "approved"
DENIED = "denied"
PAID = "paid"
CLOSED = "closed"
class Claim(Base):
__tablename__ = "claims"
id = Column(Integer, primary_key=True, index=True)
claim_number = Column(String, unique=True, index=True, nullable=False)
policy_id = Column(Integer, ForeignKey("policies.id"), nullable=False)
incident_date = Column(DateTime, nullable=False)
report_date = Column(DateTime, nullable=False, server_default=func.now())
description = Column(String, nullable=False)
claim_amount = Column(Float, nullable=False)
status = Column(Enum(ClaimStatus), default=ClaimStatus.SUBMITTED, nullable=False)
settlement_amount = Column(Float, nullable=True)
settlement_date = Column(DateTime, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
policy = relationship("Policy", back_populates="claims")

21
app/models/customer.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.base import Base
class Customer(Base):
__tablename__ = "customers"
id = Column(Integer, primary_key=True, index=True)
first_name = Column(String, nullable=False)
last_name = Column(String, nullable=False)
email = Column(String, unique=True, index=True, nullable=False)
phone_number = Column(String, nullable=True)
address = Column(String, nullable=True)
date_of_birth = Column(DateTime, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
policies = relationship("Policy", back_populates="customer", cascade="all, delete-orphan")

40
app/models/policy.py Normal file
View File

@ -0,0 +1,40 @@
from sqlalchemy import Column, Integer, String, DateTime, Float, ForeignKey, Enum
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import enum
from app.db.base import Base
class PolicyType(str, enum.Enum):
AUTO = "auto"
HOME = "home"
LIFE = "life"
HEALTH = "health"
TRAVEL = "travel"
class PolicyStatus(str, enum.Enum):
ACTIVE = "active"
EXPIRED = "expired"
CANCELLED = "cancelled"
PENDING = "pending"
class Policy(Base):
__tablename__ = "policies"
id = Column(Integer, primary_key=True, index=True)
policy_number = Column(String, unique=True, index=True, nullable=False)
customer_id = Column(Integer, ForeignKey("customers.id"), nullable=False)
policy_type = Column(Enum(PolicyType), nullable=False)
status = Column(Enum(PolicyStatus), default=PolicyStatus.PENDING, nullable=False)
start_date = Column(DateTime, nullable=False)
end_date = Column(DateTime, nullable=False)
premium_amount = Column(Float, nullable=False)
coverage_amount = Column(Float, nullable=False)
deductible_amount = Column(Float, nullable=True)
description = Column(String, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
customer = relationship("Customer", back_populates="policies")
claims = relationship("Claim", back_populates="policy", cascade="all, delete-orphan")

16
app/models/user.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Boolean, Column, Integer, String, DateTime
from sqlalchemy.sql import func
from app.db.base import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

14
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,14 @@
# Pydantic schemas package
from app.schemas.user import User, UserCreate, UserUpdate, UserInDB
from app.schemas.customer import Customer, CustomerCreate, CustomerUpdate, CustomerInDB
from app.schemas.policy import Policy, PolicyCreate, PolicyUpdate, PolicyInDB
from app.schemas.claim import Claim, ClaimCreate, ClaimUpdate, ClaimInDB
from app.schemas.token import Token, TokenPayload
__all__ = [
"User", "UserCreate", "UserUpdate", "UserInDB",
"Customer", "CustomerCreate", "CustomerUpdate", "CustomerInDB",
"Policy", "PolicyCreate", "PolicyUpdate", "PolicyInDB",
"Claim", "ClaimCreate", "ClaimUpdate", "ClaimInDB",
"Token", "TokenPayload"
]

46
app/schemas/claim.py Normal file
View File

@ -0,0 +1,46 @@
from typing import Optional
from datetime import datetime, date
from pydantic import BaseModel, Field
from app.models.claim import ClaimStatus
# Shared properties
class ClaimBase(BaseModel):
claim_number: str
policy_id: int
incident_date: date
description: str
claim_amount: float = Field(..., gt=0)
status: ClaimStatus = ClaimStatus.SUBMITTED
settlement_amount: Optional[float] = Field(None, ge=0)
settlement_date: Optional[date] = None
# Properties to receive via API on creation
class ClaimCreate(ClaimBase):
pass
# Properties to receive via API on update
class ClaimUpdate(BaseModel):
description: Optional[str] = None
claim_amount: Optional[float] = Field(None, gt=0)
status: Optional[ClaimStatus] = None
settlement_amount: Optional[float] = Field(None, ge=0)
settlement_date: Optional[date] = None
# Properties shared by models stored in DB
class ClaimInDBBase(ClaimBase):
id: int
report_date: datetime
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class Claim(ClaimInDBBase):
pass
# Additional properties stored in DB
class ClaimInDB(ClaimInDBBase):
pass

39
app/schemas/customer.py Normal file
View File

@ -0,0 +1,39 @@
from typing import Optional
from datetime import datetime, date
from pydantic import BaseModel, EmailStr
# Shared properties
class CustomerBase(BaseModel):
first_name: str
last_name: str
email: EmailStr
phone_number: Optional[str] = None
address: Optional[str] = None
date_of_birth: Optional[date] = None
# Properties to receive via API on creation
class CustomerCreate(CustomerBase):
pass
# Properties to receive via API on update
class CustomerUpdate(CustomerBase):
first_name: Optional[str] = None
last_name: Optional[str] = None
email: Optional[EmailStr] = None
# Properties shared by models stored in DB
class CustomerInDBBase(CustomerBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class Customer(CustomerInDBBase):
pass
# Additional properties stored in DB
class CustomerInDB(CustomerInDBBase):
pass

50
app/schemas/policy.py Normal file
View File

@ -0,0 +1,50 @@
from typing import Optional
from datetime import datetime, date
from pydantic import BaseModel, Field
from app.models.policy import PolicyType, PolicyStatus
# Shared properties
class PolicyBase(BaseModel):
policy_number: str
customer_id: int
policy_type: PolicyType
status: PolicyStatus = PolicyStatus.PENDING
start_date: date
end_date: date
premium_amount: float = Field(..., gt=0)
coverage_amount: float = Field(..., gt=0)
deductible_amount: Optional[float] = Field(None, ge=0)
description: Optional[str] = None
# Properties to receive via API on creation
class PolicyCreate(PolicyBase):
pass
# Properties to receive via API on update
class PolicyUpdate(BaseModel):
policy_type: Optional[PolicyType] = None
status: Optional[PolicyStatus] = None
start_date: Optional[date] = None
end_date: Optional[date] = None
premium_amount: Optional[float] = Field(None, gt=0)
coverage_amount: Optional[float] = Field(None, gt=0)
deductible_amount: Optional[float] = Field(None, ge=0)
description: Optional[str] = None
# Properties shared by models stored in DB
class PolicyInDBBase(PolicyBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class Policy(PolicyInDBBase):
pass
# Additional properties stored in DB
class PolicyInDB(PolicyInDBBase):
pass

9
app/schemas/token.py Normal file
View File

@ -0,0 +1,9 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

34
app/schemas/user.py Normal file
View File

@ -0,0 +1,34 @@
from typing import Optional
from datetime import datetime
from pydantic import BaseModel, EmailStr
# Shared properties
class UserBase(BaseModel):
email: EmailStr
full_name: Optional[str] = None
is_active: bool = True
# Properties to receive via API on creation
class UserCreate(UserBase):
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
# Properties shared by models stored in DB
class UserInDBBase(UserBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB but not returned by API
class UserInDB(UserInDBBase):
hashed_password: str

45
main.py Normal file
View File

@ -0,0 +1,45 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from app.api.v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description=settings.PROJECT_DESCRIPTION,
version=settings.PROJECT_VERSION,
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Set up CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API router
app.include_router(api_router, prefix="/api/v1")
@app.get("/", response_class=JSONResponse)
async def root():
"""Root endpoint - Returns basic service information."""
return {
"title": settings.PROJECT_NAME,
"docs": "/docs",
"health": "/health"
}
@app.get("/health", response_class=JSONResponse)
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

1
migrations/__init__.py Normal file
View File

@ -0,0 +1 @@
# Migrations package

85
migrations/env.py Normal file
View File

@ -0,0 +1,85 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# Import the Base model for the database schema
from app.db.base import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == 'sqlite'
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite, # This is important for SQLite migrations
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,113 @@
"""Initial migration
Revision ID: 1_initial_migration
Revises:
Create Date: 2023-08-01 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1_initial_migration'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create users table
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
# Create customers table
op.create_table(
'customers',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('first_name', sa.String(), nullable=False),
sa.Column('last_name', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('phone_number', sa.String(), nullable=True),
sa.Column('address', sa.String(), nullable=True),
sa.Column('date_of_birth', sa.DateTime(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_customers_email'), 'customers', ['email'], unique=True)
op.create_index(op.f('ix_customers_id'), 'customers', ['id'], unique=False)
# Create policies table
op.create_table(
'policies',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('policy_number', sa.String(), nullable=False),
sa.Column('customer_id', sa.Integer(), nullable=False),
sa.Column('policy_type', sa.Enum('auto', 'home', 'life', 'health', 'travel', name='policytype'), nullable=False),
sa.Column('status', sa.Enum('active', 'expired', 'cancelled', 'pending', name='policystatus'), nullable=False),
sa.Column('start_date', sa.DateTime(), nullable=False),
sa.Column('end_date', sa.DateTime(), nullable=False),
sa.Column('premium_amount', sa.Float(), nullable=False),
sa.Column('coverage_amount', sa.Float(), nullable=False),
sa.Column('deductible_amount', sa.Float(), nullable=True),
sa.Column('description', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['customer_id'], ['customers.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_policies_id'), 'policies', ['id'], unique=False)
op.create_index(op.f('ix_policies_policy_number'), 'policies', ['policy_number'], unique=True)
# Create claims table
op.create_table(
'claims',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('claim_number', sa.String(), nullable=False),
sa.Column('policy_id', sa.Integer(), nullable=False),
sa.Column('incident_date', sa.DateTime(), nullable=False),
sa.Column('report_date', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('claim_amount', sa.Float(), nullable=False),
sa.Column('status', sa.Enum('submitted', 'under_review', 'approved', 'denied', 'paid', 'closed', name='claimstatus'), nullable=False),
sa.Column('settlement_amount', sa.Float(), nullable=True),
sa.Column('settlement_date', sa.DateTime(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['policy_id'], ['policies.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_claims_claim_number'), 'claims', ['claim_number'], unique=True)
op.create_index(op.f('ix_claims_id'), 'claims', ['id'], unique=False)
def downgrade() -> None:
# Drop tables in reverse order
op.drop_index(op.f('ix_claims_id'), table_name='claims')
op.drop_index(op.f('ix_claims_claim_number'), table_name='claims')
op.drop_table('claims')
op.drop_index(op.f('ix_policies_policy_number'), table_name='policies')
op.drop_index(op.f('ix_policies_id'), table_name='policies')
op.drop_table('policies')
op.drop_index(op.f('ix_customers_id'), table_name='customers')
op.drop_index(op.f('ix_customers_email'), table_name='customers')
op.drop_table('customers')
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')

View File

@ -0,0 +1 @@
# Migration versions package

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi>=0.100.0
uvicorn>=0.22.0
sqlalchemy>=2.0.0
alembic>=1.11.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
passlib[bcrypt]>=1.7.4
python-jose[cryptography]>=3.3.0
python-multipart>=0.0.6
email-validator>=2.0.0
ruff>=0.0.275