from typing import Any, Dict, List, Optional, Union from sqlalchemy.orm import Session from app.models.payment import Payment from app.models.invoice import Invoice, InvoiceStatus from app.schemas.payment import PaymentCreate, PaymentUpdate def get_payment(db: Session, user_id: int, payment_id: int) -> Optional[Payment]: """Get a payment by ID for a specific user.""" return db.query(Payment).join(Invoice).filter( Payment.id == payment_id, Invoice.user_id == user_id ).first() def get_payments_for_invoice( db: Session, user_id: int, invoice_id: int, skip: int = 0, limit: int = 100 ) -> List[Payment]: """Get a list of payments for a specific invoice.""" return db.query(Payment).join(Invoice).filter( Payment.invoice_id == invoice_id, Invoice.user_id == user_id ).offset(skip).limit(limit).all() def get_payments( db: Session, user_id: int, skip: int = 0, limit: int = 100 ) -> List[Payment]: """Get a list of all payments for a user.""" return db.query(Payment).join(Invoice).filter( Invoice.user_id == user_id ).order_by(Payment.payment_date.desc()).offset(skip).limit(limit).all() def create_payment(db: Session, user_id: int, payment_in: PaymentCreate) -> Payment: """Create a new payment.""" # Get the invoice to ensure it belongs to the user invoice = db.query(Invoice).filter( Invoice.id == payment_in.invoice_id, Invoice.user_id == user_id ).first() if not invoice: raise ValueError("Invoice not found or does not belong to the user") # Create payment payment_data = payment_in.model_dump() db_payment = Payment(**payment_data) db.add(db_payment) # Calculate total paid amount for the invoice existing_payments = db.query(Payment).filter(Payment.invoice_id == invoice.id).all() total_paid = sum(payment.amount for payment in existing_payments) + payment_in.amount # Update invoice status if fully paid if total_paid >= invoice.total: invoice.status = InvoiceStatus.PAID elif invoice.status == InvoiceStatus.DRAFT: # If invoice is in draft, move to sent when payment is received invoice.status = InvoiceStatus.SENT db.commit() db.refresh(db_payment) return db_payment def update_payment( db: Session, user_id: int, db_payment: Payment, payment_in: Union[PaymentUpdate, Dict[str, Any]] ) -> Payment: """Update a payment.""" payment_data = db_payment.to_dict() if isinstance(payment_in, dict): update_data = payment_in else: update_data = payment_in.model_dump(exclude_unset=True) # Update payment fields for field in payment_data: if field in update_data: setattr(db_payment, field, update_data[field]) db.add(db_payment) # Get the invoice invoice = db.query(Invoice).filter(Invoice.id == db_payment.invoice_id).first() # Calculate total paid amount for the invoice existing_payments = db.query(Payment).filter(Payment.invoice_id == invoice.id).all() total_paid = sum(payment.amount for payment in existing_payments) # Update invoice status based on payment if total_paid >= invoice.total: invoice.status = InvoiceStatus.PAID elif invoice.status == InvoiceStatus.PAID: # If previously fully paid but now partially paid invoice.status = InvoiceStatus.SENT db.commit() db.refresh(db_payment) return db_payment def delete_payment(db: Session, user_id: int, payment_id: int) -> Optional[Payment]: """Delete a payment.""" payment = db.query(Payment).join(Invoice).filter( Payment.id == payment_id, Invoice.user_id == user_id ).first() if not payment: return None # Get the invoice invoice = db.query(Invoice).filter(Invoice.id == payment.invoice_id).first() # Delete the payment db.delete(payment) # Calculate total paid amount for the invoice after deletion existing_payments = db.query(Payment).filter( Payment.invoice_id == invoice.id, Payment.id != payment_id ).all() total_paid = sum(payment.amount for payment in existing_payments) # Update invoice status based on payment if total_paid >= invoice.total: invoice.status = InvoiceStatus.PAID elif invoice.status == InvoiceStatus.PAID: # If previously fully paid but now partially paid invoice.status = InvoiceStatus.SENT db.commit() return payment