Fix linting issues and finalize User Authentication Service

- Removed unused imports in deps.py, routes/auth.py, and models/user.py
- Code is now lint-free and follows best practices
- Complete FastAPI user authentication service with JWT token support

generated with BackendIM... (backend.im)
This commit is contained in:
Automated Action 2025-05-14 09:46:58 +00:00
parent 5f5ab1f95b
commit e01f6aaf16
24 changed files with 730 additions and 2 deletions

View File

@ -1,3 +1,87 @@
# FastAPI Application
# User Authentication Service
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A FastAPI-based user authentication service with JWT authentication.
## Features
- User registration and login
- JWT authentication
- User profile management
- SQLite database with SQLAlchemy ORM
- Alembic migrations
- Health check endpoint
## Requirements
- Python 3.8+
- FastAPI
- SQLAlchemy
- Alembic
- Pydantic
- Python-jose (JWT)
- Passlib (Password hashing)
## Project Structure
```
.
├── alembic/ # Database migrations
├── app/ # Application code
│ ├── api/ # API endpoints
│ │ ├── deps.py # Dependencies
│ │ └── routes/ # API route handlers
│ ├── core/ # Core functionality
│ │ ├── config.py # Configuration settings
│ │ └── security.py # Security utilities
│ ├── db/ # Database
│ │ ├── crud.py # CRUD operations
│ │ └── session.py # DB session
│ ├── models/ # SQLAlchemy models
│ └── schemas/ # Pydantic schemas
├── alembic.ini # Alembic configuration
├── main.py # FastAPI application
└── requirements.txt # Dependencies
```
## Setup
1. Clone the repository
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run the migrations:
```bash
alembic upgrade head
```
4. Start the application:
```bash
uvicorn main:app --reload
```
## API Endpoints
### Authentication
- `POST /api/v1/auth/register` - Register a new user
- `POST /api/v1/auth/login` - Login and get JWT token
- `GET /api/v1/auth/me` - Get current user info (requires authentication)
### Health Check
- `GET /health` - Check service health
## API Documentation
- Swagger UI: `/docs`
- ReDoc: `/redoc`
## Development
To run the application in development mode:
```bash
uvicorn main:app --reload
```
This application was bootstrapped by BackendIM, the AI-powered backend generation platform.

85
alembic.ini Normal file
View File

@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

83
alembic/env.py Normal file
View File

@ -0,0 +1,83 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import your models here
from app.db.session import Base
from app.models.user import User # noqa
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,41 @@
"""create user table
Revision ID: 001
Revises:
Create Date: 2025-05-14
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
def downgrade():
op.drop_index(op.f('ix_users_username'), table_name='users')
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

43
app/api/deps.py Normal file
View File

@ -0,0 +1,43 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import models, schemas
from app.core.config import settings
from app.db.session import get_db
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = db.query(models.User).filter(models.User.id == token_data.sub).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

View File

79
app/api/routes/auth.py Normal file
View File

@ -0,0 +1,79 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import schemas
from app.api import deps
from app.core import security
from app.core.config import settings
from app.db import crud
router = APIRouter(prefix=f"{settings.API_V1_STR}/auth", tags=["auth"])
@router.post("/register", response_model=schemas.User)
def register_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Register a new user.
"""
# Check if user with this email already exists
user = crud.get_user_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="A user with this email already exists in the system.",
)
# Check if user with this username already exists
user = crud.get_user_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=400,
detail="A user with this username already exists in the system.",
)
# Create new user
user = crud.create_user(db, user_in=user_in)
return user
@router.post("/login", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests.
"""
user = crud.authenticate_user(
db, username=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
elif not crud.is_active_user(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.get("/me", response_model=schemas.User)
def read_users_me(
current_user: schemas.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user

26
app/api/routes/health.py Normal file
View File

@ -0,0 +1,26 @@
from typing import Dict
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.api import deps
router = APIRouter(prefix="/health", tags=["health"])
@router.get("", response_model=Dict[str, str])
def health_check(db: Session = Depends(deps.get_db)) -> Dict[str, str]:
"""
Health check endpoint.
"""
try:
# Try to execute a simple query to check database connection
db.execute("SELECT 1")
db_status = "healthy"
except Exception:
db_status = "unhealthy"
return {
"status": "healthy",
"database": db_status
}

0
app/core/__init__.py Normal file
View File

28
app/core/config.py Normal file
View File

@ -0,0 +1,28 @@
from pathlib import Path
from typing import List
from pydantic import AnyHttpUrl
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "User Authentication Service"
# CORS
CORS_ORIGINS: List[AnyHttpUrl] = []
# JWT
SECRET_KEY: str = "supersecretkey" # In production, set this as an env variable
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Database
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
model_config = SettingsConfigDict(case_sensitive=True)
settings = Settings()

34
app/core/security.py Normal file
View File

@ -0,0 +1,34 @@
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: Optional[timedelta] = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

0
app/db/__init__.py Normal file
View File

71
app/db/crud.py Normal file
View File

@ -0,0 +1,71 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
def get_user(db: Session, user_id: int) -> Optional[User]:
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def get_user_by_username(db: Session, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user_in: UserCreate) -> User:
db_user = User(
email=user_in.email,
username=user_in.username,
hashed_password=get_password_hash(user_in.password),
is_active=True,
is_superuser=False,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(
db: Session,
db_user: User,
user_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
user_data = user_in.model_dump(exclude_unset=True) if isinstance(user_in, UserUpdate) else user_in
if user_data.get("password"):
hashed_password = get_password_hash(user_data["password"])
del user_data["password"]
user_data["hashed_password"] = hashed_password
for field, value in user_data.items():
setattr(db_user, field, value)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
user = get_user_by_username(db, username=username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active_user(user: User) -> bool:
return user.is_active

21
app/db/session.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # for SQLite only
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

1
app/models/__init__.py Normal file
View File

@ -0,0 +1 @@
from app.models.user import User # noqa

17
app/models/user.py Normal file
View File

@ -0,0 +1,17 @@
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from app.db.session import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

2
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from app.schemas.user import User, UserCreate, UserInDB, UserUpdate # noqa
from app.schemas.token import Token, TokenPayload # noqa

12
app/schemas/token.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

40
app/schemas/user.py Normal file
View File

@ -0,0 +1,40 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr, Field
class UserBase(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
is_active: Optional[bool] = True
class UserCreate(UserBase):
email: EmailStr
username: str
password: str = Field(..., min_length=8)
class UserUpdate(UserBase):
password: Optional[str] = Field(None, min_length=8)
class UserInDBBase(UserBase):
id: int
email: EmailStr
username: str
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class User(UserInDBBase):
pass
class UserInDB(UserInDBBase):
hashed_password: str

28
main.py Normal file
View File

@ -0,0 +1,28 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import auth, health
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description="User Authentication Service API",
version="0.1.0",
)
# Set up CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(auth.router)
app.include_router(health.router)
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
fastapi==0.110.0
uvicorn==0.28.0
sqlalchemy==2.0.28
pydantic==2.6.3
alembic==1.13.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.7
ruff==0.2.2