Automated Action 5b55eedd2b Implement User Authentication and Authorization Service
This commit includes:
- User registration and authentication API with JWT
- Password reset functionality
- Role-based access control system
- Database models and migrations with SQLAlchemy and Alembic
- API documentation in README

generated with BackendIM... (backend.im)
2025-05-15 19:46:38 +00:00

225 lines
5.8 KiB
Python

from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.dependencies.auth import get_current_active_user
from app.dependencies.role import check_user_role
from app.models.user import User
from app.schemas.role import Role
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate, UserUpdate
from app.services.role import assign_role_to_user, remove_role_from_user
from app.services.user import (
create_user,
delete_user,
get_user_by_email,
get_user_by_id,
get_users,
update_user,
verify_user
)
router = APIRouter()
@router.post("", response_model=UserSchema, status_code=status.HTTP_201_CREATED)
def register_user(
user_in: UserCreate,
db: Session = Depends(get_db)
):
"""
Register a new user.
"""
# Check if user already exists
user = get_user_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User with this email already exists"
)
# Create user
user = create_user(db, user_in=user_in)
return user
@router.get("", response_model=List[UserSchema])
def read_users(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(check_user_role(["admin"])),
db: Session = Depends(get_db)
):
"""
Retrieve users. Only for admins.
"""
users = get_users(db, skip=skip, limit=limit)
return users
@router.get("/me", response_model=UserSchema)
def read_user_me(
current_user: User = Depends(get_current_active_user)
):
"""
Get current user.
"""
return current_user
@router.put("/me", response_model=UserSchema)
def update_user_me(
user_in: UserUpdate,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Update current user.
"""
user = update_user(db, user_id=current_user.id, user_in=user_in)
return user
@router.get("/{user_id}", response_model=UserSchema)
def read_user(
user_id: int,
current_user: User = Depends(check_user_role(["admin"])),
db: Session = Depends(get_db)
):
"""
Get a specific user by ID. Only for admins.
"""
user = get_user_by_id(db, user_id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.put("/{user_id}", response_model=UserSchema)
def update_user_by_id(
user_id: int,
user_in: UserUpdate,
current_user: User = Depends(check_user_role(["admin"])),
db: Session = Depends(get_db)
):
"""
Update a user. Only for admins.
"""
user = get_user_by_id(db, user_id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user = update_user(db, user_id=user_id, user_in=user_in)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user_by_id(
user_id: int,
current_user: User = Depends(check_user_role(["admin"])),
db: Session = Depends(get_db)
):
"""
Delete a user. Only for admins.
"""
user = get_user_by_id(db, user_id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
result = delete_user(db, user_id=user_id)
if not result:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete user"
)
return None
@router.post("/{user_id}/verify", response_model=UserSchema)
def verify_user_by_id(
user_id: int,
current_user: User = Depends(check_user_role(["admin"])),
db: Session = Depends(get_db)
):
"""
Verify a user. Only for admins.
"""
user = verify_user(db, user_id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.get("/{user_id}/roles", response_model=List[Role])
def read_user_roles(
user_id: int,
current_user: User = Depends(check_user_role(["admin"])),
db: Session = Depends(get_db)
):
"""
Get a user's roles. Only for admins.
"""
user = get_user_by_id(db, user_id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user.roles
@router.post("/{user_id}/roles/{role_id}", status_code=status.HTTP_200_OK)
def add_role_to_user(
user_id: int,
role_id: int,
current_user: User = Depends(check_user_role(["admin"])),
db: Session = Depends(get_db)
):
"""
Add a role to a user. Only for admins.
"""
user_role = assign_role_to_user(db, user_id=user_id, role_id=role_id)
if not user_role:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to assign role to user"
)
return {"message": "Role assigned to user successfully"}
@router.delete("/{user_id}/roles/{role_id}", status_code=status.HTTP_200_OK)
def remove_role_from_user_endpoint(
user_id: int,
role_id: int,
current_user: User = Depends(check_user_role(["admin"])),
db: Session = Depends(get_db)
):
"""
Remove a role from a user. Only for admins.
"""
result = remove_role_from_user(db, user_id=user_id, role_id=role_id)
if not result:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to remove role from user"
)
return {"message": "Role removed from user successfully"}