
- Implemented complete authentication system with JWT tokens - Created user management with registration and profile endpoints - Built client management with full CRUD operations - Developed invoice system with line items and automatic calculations - Set up SQLite database with proper migrations using Alembic - Added health monitoring and API documentation - Configured CORS for cross-origin requests - Included comprehensive README with usage examples
76 lines
2.5 KiB
Python
76 lines
2.5 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
from app.db.session import get_db
|
|
from app.core.security import get_current_user
|
|
from app.models.user import User
|
|
from app.models.client import Client
|
|
from app.schemas.client import ClientCreate, ClientUpdate, ClientResponse
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/", response_model=ClientResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_client(
|
|
client: ClientCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
db_client = Client(**client.dict(), owner_id=current_user.id)
|
|
db.add(db_client)
|
|
db.commit()
|
|
db.refresh(db_client)
|
|
return db_client
|
|
|
|
@router.get("/", response_model=List[ClientResponse])
|
|
async def read_clients(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
clients = db.query(Client).filter(Client.owner_id == current_user.id).offset(skip).limit(limit).all()
|
|
return clients
|
|
|
|
@router.get("/{client_id}", response_model=ClientResponse)
|
|
async def read_client(
|
|
client_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
client = db.query(Client).filter(Client.id == client_id, Client.owner_id == current_user.id).first()
|
|
if client is None:
|
|
raise HTTPException(status_code=404, detail="Client not found")
|
|
return client
|
|
|
|
@router.put("/{client_id}", response_model=ClientResponse)
|
|
async def update_client(
|
|
client_id: int,
|
|
client_update: ClientUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
client = db.query(Client).filter(Client.id == client_id, Client.owner_id == current_user.id).first()
|
|
if client is None:
|
|
raise HTTPException(status_code=404, detail="Client not found")
|
|
|
|
update_data = client_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(client, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(client)
|
|
return client
|
|
|
|
@router.delete("/{client_id}")
|
|
async def delete_client(
|
|
client_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
client = db.query(Client).filter(Client.id == client_id, Client.owner_id == current_user.id).first()
|
|
if client is None:
|
|
raise HTTPException(status_code=404, detail="Client not found")
|
|
|
|
db.delete(client)
|
|
db.commit()
|
|
return {"message": "Client deleted successfully"} |