from typing import List, Optional, Dict, Any, Union from sqlalchemy.orm import Session from app.models.invoice import Invoice, InvoiceItem from app.schemas.invoice import InvoiceCreate, InvoiceUpdate, InvoiceItemCreate def get_by_id(db: Session, invoice_id: str, user_id: str) -> Optional[Invoice]: """ Get an invoice by ID, ensuring it belongs to the specified user. """ return db.query(Invoice).filter( Invoice.id == invoice_id, Invoice.user_id == user_id ).first() def get_multi_by_user( db: Session, user_id: str, skip: int = 0, limit: int = 100 ) -> List[Invoice]: """ Get multiple invoices for a user. """ return db.query(Invoice).filter(Invoice.user_id == user_id).offset(skip).limit(limit).all() def create(db: Session, *, obj_in: InvoiceCreate, user_id: str) -> Invoice: """ Create a new invoice with items. """ # Extract items from input items_data = obj_in.items obj_data = obj_in.dict(exclude={"items"}) # Create invoice db_obj = Invoice( **obj_data, user_id=user_id, ) db.add(db_obj) db.flush() # Flush to get the invoice ID # Create invoice items for item_data in items_data: db_item = InvoiceItem( **item_data.dict(), invoice_id=db_obj.id, ) db.add(db_item) db.commit() db.refresh(db_obj) return db_obj def update( db: Session, *, db_obj: Invoice, obj_in: Union[InvoiceUpdate, Dict[str, Any]] ) -> Invoice: """ Update an invoice. """ update_data = obj_in if isinstance(obj_in, dict) else obj_in.dict(exclude_unset=True) for field, value in update_data.items(): if hasattr(db_obj, field) and value is not None: setattr(db_obj, field, value) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def remove(db: Session, *, invoice_id: str, user_id: str) -> Optional[Invoice]: """ Delete an invoice. """ invoice = get_by_id(db, invoice_id=invoice_id, user_id=user_id) if not invoice: return None db.delete(invoice) db.commit() return invoice def add_item(db: Session, *, invoice_id: str, user_id: str, obj_in: InvoiceItemCreate) -> Optional[InvoiceItem]: """ Add an item to an invoice. """ invoice = get_by_id(db, invoice_id=invoice_id, user_id=user_id) if not invoice: return None db_obj = InvoiceItem( **obj_in.dict(), invoice_id=invoice_id, ) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def remove_item(db: Session, *, item_id: str, invoice_id: str, user_id: str) -> Optional[InvoiceItem]: """ Remove an item from an invoice. """ # First check if the invoice belongs to the user invoice = get_by_id(db, invoice_id=invoice_id, user_id=user_id) if not invoice: return None # Then get the item item = db.query(InvoiceItem).filter( InvoiceItem.id == item_id, InvoiceItem.invoice_id == invoice_id ).first() if not item: return None db.delete(item) db.commit() return item