
- Added user model and schema definitions - Implemented JWT token authentication - Created endpoints for user registration and login - Added secure password hashing with bcrypt - Set up SQLite database with SQLAlchemy - Created Alembic migrations - Added user management endpoints - Included health check endpoint generated with BackendIM... (backend.im)
104 lines
3.4 KiB
Python
104 lines
3.4 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
|
|
from app.db.database import get_db
|
|
from app.db.repositories import user_repository
|
|
from app.schemas.user import User, UserUpdate
|
|
from app.services.auth_service import auth_service
|
|
from app.models.user import User as UserModel
|
|
|
|
router = APIRouter()
|
|
|
|
@router.get("/", response_model=List[User])
|
|
def read_users(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserModel = Depends(auth_service.get_current_user)
|
|
):
|
|
"""
|
|
Retrieve users. Only authenticated users can see the list.
|
|
"""
|
|
users = user_repository.list(db, skip=skip, limit=limit)
|
|
return users
|
|
|
|
@router.get("/{user_id}", response_model=User)
|
|
def read_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserModel = Depends(auth_service.get_current_user)
|
|
):
|
|
"""
|
|
Get a specific user by id.
|
|
"""
|
|
db_user = user_repository.get_by_id(db, user_id)
|
|
if db_user is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return db_user
|
|
|
|
@router.put("/{user_id}", response_model=User)
|
|
def update_user(
|
|
user_id: int,
|
|
user_in: UserUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserModel = Depends(auth_service.get_current_user)
|
|
):
|
|
"""
|
|
Update a user. Users can only update their own information.
|
|
"""
|
|
# Only allow users to update their own information
|
|
if current_user.id != user_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions"
|
|
)
|
|
|
|
db_user = user_repository.get_by_id(db, user_id)
|
|
if db_user is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Check for email uniqueness if changing email
|
|
if user_in.email and user_in.email != db_user.email:
|
|
existing_user = user_repository.get_by_email(db, user_in.email)
|
|
if existing_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Email already registered"
|
|
)
|
|
|
|
# Check for username uniqueness if changing username
|
|
if user_in.username and user_in.username != db_user.username:
|
|
existing_user = user_repository.get_by_username(db, user_in.username)
|
|
if existing_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Username already taken"
|
|
)
|
|
|
|
user_data = user_in.model_dump(exclude_unset=True)
|
|
updated_user = user_repository.update(db, db_user, user_data)
|
|
return updated_user
|
|
|
|
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserModel = Depends(auth_service.get_current_user)
|
|
):
|
|
"""
|
|
Delete a user. Users can only delete their own account.
|
|
"""
|
|
# Only allow users to delete their own account
|
|
if current_user.id != user_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions"
|
|
)
|
|
|
|
db_user = user_repository.get_by_id(db, user_id)
|
|
if db_user is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
user_repository.delete(db, db_user)
|
|
return None |