Automated Action d4b0ceed9c Implement Task Manager API with FastAPI and SQLite
- Setup project structure and FastAPI application
- Configure SQLite database with SQLAlchemy ORM
- Setup Alembic for database migrations
- Implement user authentication with JWT
- Create task models and CRUD operations
- Implement task assignment functionality
- Add detailed API documentation
- Create comprehensive README with usage instructions
- Lint code with Ruff
2025-06-08 18:02:43 +00:00

177 lines
4.5 KiB
Python

from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.core.deps import get_current_active_superuser, get_current_user, get_db
from app.core.security import get_password_hash
from app.models.user import User
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate, UserUpdate
router = APIRouter()
@router.get("/", response_model=List[UserSchema])
def read_users(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Retrieve all users (superuser only).
"""
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.post("/", response_model=UserSchema)
def create_user(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
) -> Any:
"""
Create a new user.
"""
# Check if user with same email exists
user = db.query(User).filter(User.email == user_in.email).first()
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists.",
)
# Check if user with same username exists
user = db.query(User).filter(User.username == user_in.username).first()
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists.",
)
# Create user object
user_data = user_in.model_dump(exclude={"password"})
user = User(
**user_data,
hashed_password=get_password_hash(user_in.password),
)
# Save to database
db.add(user)
db.commit()
db.refresh(user)
return user
@router.get("/me", response_model=UserSchema)
def read_user_me(
current_user: User = Depends(get_current_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.put("/me", response_model=UserSchema)
def update_user_me(
*,
db: Session = Depends(get_db),
user_in: UserUpdate,
current_user: User = Depends(get_current_user),
) -> Any:
"""
Update current user.
"""
# Convert model to dict
user_data = jsonable_encoder(current_user)
# Update with new data
update_data = user_in.model_dump(exclude_unset=True)
# If password in update data, hash it
if "password" in update_data and update_data["password"]:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
# Update user object
for field in user_data:
if field in update_data:
setattr(current_user, field, update_data[field])
# Save to database
db.add(current_user)
db.commit()
db.refresh(current_user)
return current_user
@router.get("/{user_id}", response_model=UserSchema)
def read_user_by_id(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
"""
Get a specific user by id.
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Only superusers can access other users' data
if user.id != current_user.id and not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this user",
)
return user
@router.put("/{user_id}", response_model=UserSchema)
def update_user(
*,
db: Session = Depends(get_db),
user_id: int,
user_in: UserUpdate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Update a user (superuser only).
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Update with new data
update_data = user_in.model_dump(exclude_unset=True)
# If password in update data, hash it
if "password" in update_data and update_data["password"]:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
# Update user object
for field, value in update_data.items():
if hasattr(user, field):
setattr(user, field, value)
# Save to database
db.add(user)
db.commit()
db.refresh(user)
return user