Automated Action 794d172f85 Add user authentication system with login, signup, and JWT tokens
- Added user model and schema definitions
- Implemented JWT token authentication
- Created endpoints for user registration and login
- Added secure password hashing with bcrypt
- Set up SQLite database with SQLAlchemy
- Created Alembic migrations
- Added user management endpoints
- Included health check endpoint

generated with BackendIM... (backend.im)
2025-05-11 22:51:17 +00:00

104 lines
3.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from app.db.database import get_db
from app.db.repositories import user_repository
from app.schemas.user import User, UserUpdate
from app.services.auth_service import auth_service
from app.models.user import User as UserModel
router = APIRouter()
@router.get("/", response_model=List[User])
def read_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: UserModel = Depends(auth_service.get_current_user)
):
"""
Retrieve users. Only authenticated users can see the list.
"""
users = user_repository.list(db, skip=skip, limit=limit)
return users
@router.get("/{user_id}", response_model=User)
def read_user(
user_id: int,
db: Session = Depends(get_db),
current_user: UserModel = Depends(auth_service.get_current_user)
):
"""
Get a specific user by id.
"""
db_user = user_repository.get_by_id(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@router.put("/{user_id}", response_model=User)
def update_user(
user_id: int,
user_in: UserUpdate,
db: Session = Depends(get_db),
current_user: UserModel = Depends(auth_service.get_current_user)
):
"""
Update a user. Users can only update their own information.
"""
# Only allow users to update their own information
if current_user.id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
db_user = user_repository.get_by_id(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
# Check for email uniqueness if changing email
if user_in.email and user_in.email != db_user.email:
existing_user = user_repository.get_by_email(db, user_in.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Check for username uniqueness if changing username
if user_in.username and user_in.username != db_user.username:
existing_user = user_repository.get_by_username(db, user_in.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
user_data = user_in.model_dump(exclude_unset=True)
updated_user = user_repository.update(db, db_user, user_data)
return updated_user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(
user_id: int,
db: Session = Depends(get_db),
current_user: UserModel = Depends(auth_service.get_current_user)
):
"""
Delete a user. Users can only delete their own account.
"""
# Only allow users to delete their own account
if current_user.id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
db_user = user_repository.get_by_id(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
user_repository.delete(db, db_user)
return None