79 lines
2.7 KiB
Python

from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import models, schemas
from app.core.deps import get_current_active_user, get_db
from app.services.customer import customer_service
router = APIRouter()
@router.get("/", response_model=List[schemas.Customer])
def read_customers(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customers = customer_service.get_multi_by_user(db, user_id=current_user.id, skip=skip, limit=limit)
return customers
@router.post("/", response_model=schemas.Customer)
def create_customer(
*,
db: Session = Depends(get_db),
customer_in: schemas.CustomerCreate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.create(db=db, obj_in=customer_in, user_id=current_user.id)
return customer
@router.put("/{customer_id}", response_model=schemas.Customer)
def update_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
customer_in: schemas.CustomerUpdate,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.get(db=db, customer_id=customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
if customer.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
customer = customer_service.update(db=db, db_obj=customer, obj_in=customer_in)
return customer
@router.get("/{customer_id}", response_model=schemas.Customer)
def read_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.get(db=db, customer_id=customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
if customer.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
return customer
@router.delete("/{customer_id}", response_model=schemas.Customer)
def delete_customer(
*,
db: Session = Depends(get_db),
customer_id: int,
current_user: models.User = Depends(get_current_active_user),
) -> Any:
customer = customer_service.get(db=db, customer_id=customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
if customer.user_id != current_user.id:
raise HTTPException(status_code=400, detail="Not enough permissions")
customer = customer_service.remove(db=db, customer_id=customer_id)
return customer