Automated Action f2d3f2d55c Create FastAPI REST API with Python and SQLite
- Implemented user authentication with JWT
- Added CRUD operations for users and items
- Setup database connection with SQLAlchemy
- Added migration scripts for easy database setup
- Included health check endpoint for monitoring

generated with BackendIM... (backend.im)
2025-05-13 18:33:54 +00:00

143 lines
4.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import List
from datetime import timedelta
from app.database import get_db
from app.models import User
from app.schemas import UserCreate, UserUpdate, UserResponse, Token
from app.utils.security import (
get_password_hash,
authenticate_user,
create_access_token,
get_current_active_user,
ACCESS_TOKEN_EXPIRE_MINUTES,
)
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
# Check if user with this email already exists
db_user_email = db.query(User).filter(User.email == user.email).first()
if db_user_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Check if user with this username already exists
db_user_username = db.query(User).filter(User.username == user.username).first()
if db_user_username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken",
)
# Create new user
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email,
username=user.username,
hashed_password=hashed_password,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.get("/me", response_model=UserResponse)
def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@router.put("/me", response_model=UserResponse)
def update_user(
user_update: UserUpdate,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db),
):
# Update email if provided and not already in use
if user_update.email is not None and user_update.email != current_user.email:
db_user = db.query(User).filter(User.email == user_update.email).first()
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
current_user.email = user_update.email
# Update username if provided and not already in use
if user_update.username is not None and user_update.username != current_user.username:
db_user = db.query(User).filter(User.username == user_update.username).first()
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken",
)
current_user.username = user_update.username
# Update password if provided
if user_update.password is not None:
current_user.hashed_password = get_password_hash(user_update.password)
# Update is_active if provided
if user_update.is_active is not None:
current_user.is_active = user_update.is_active
db.commit()
db.refresh(current_user)
return current_user
@router.post("/token", response_model=Token)
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/", response_model=List[UserResponse])
def read_users(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db),
):
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.get("/{user_id}", response_model=UserResponse)
def read_user(
user_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db),
):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return db_user