136 lines
4.5 KiB
Python

from typing import Any, Dict, List, Optional, Union
from sqlalchemy.orm import Session
from app.models.payment import Payment
from app.models.invoice import Invoice, InvoiceStatus
from app.schemas.payment import PaymentCreate, PaymentUpdate
def get_payment(db: Session, user_id: int, payment_id: int) -> Optional[Payment]:
"""Get a payment by ID for a specific user."""
return db.query(Payment).join(Invoice).filter(
Payment.id == payment_id, Invoice.user_id == user_id
).first()
def get_payments_for_invoice(
db: Session, user_id: int, invoice_id: int, skip: int = 0, limit: int = 100
) -> List[Payment]:
"""Get a list of payments for a specific invoice."""
return db.query(Payment).join(Invoice).filter(
Payment.invoice_id == invoice_id, Invoice.user_id == user_id
).offset(skip).limit(limit).all()
def get_payments(
db: Session, user_id: int, skip: int = 0, limit: int = 100
) -> List[Payment]:
"""Get a list of all payments for a user."""
return db.query(Payment).join(Invoice).filter(
Invoice.user_id == user_id
).order_by(Payment.payment_date.desc()).offset(skip).limit(limit).all()
def create_payment(db: Session, user_id: int, payment_in: PaymentCreate) -> Payment:
"""Create a new payment."""
# Get the invoice to ensure it belongs to the user
invoice = db.query(Invoice).filter(
Invoice.id == payment_in.invoice_id, Invoice.user_id == user_id
).first()
if not invoice:
raise ValueError("Invoice not found or does not belong to the user")
# Create payment
payment_data = payment_in.model_dump()
db_payment = Payment(**payment_data)
db.add(db_payment)
# Calculate total paid amount for the invoice
existing_payments = db.query(Payment).filter(Payment.invoice_id == invoice.id).all()
total_paid = sum(payment.amount for payment in existing_payments) + payment_in.amount
# Update invoice status if fully paid
if total_paid >= invoice.total:
invoice.status = InvoiceStatus.PAID
elif invoice.status == InvoiceStatus.DRAFT:
# If invoice is in draft, move to sent when payment is received
invoice.status = InvoiceStatus.SENT
db.commit()
db.refresh(db_payment)
return db_payment
def update_payment(
db: Session,
user_id: int,
db_payment: Payment,
payment_in: Union[PaymentUpdate, Dict[str, Any]]
) -> Payment:
"""Update a payment."""
payment_data = db_payment.to_dict()
if isinstance(payment_in, dict):
update_data = payment_in
else:
update_data = payment_in.model_dump(exclude_unset=True)
# Update payment fields
for field in payment_data:
if field in update_data:
setattr(db_payment, field, update_data[field])
db.add(db_payment)
# Get the invoice
invoice = db.query(Invoice).filter(Invoice.id == db_payment.invoice_id).first()
# Calculate total paid amount for the invoice
existing_payments = db.query(Payment).filter(Payment.invoice_id == invoice.id).all()
total_paid = sum(payment.amount for payment in existing_payments)
# Update invoice status based on payment
if total_paid >= invoice.total:
invoice.status = InvoiceStatus.PAID
elif invoice.status == InvoiceStatus.PAID:
# If previously fully paid but now partially paid
invoice.status = InvoiceStatus.SENT
db.commit()
db.refresh(db_payment)
return db_payment
def delete_payment(db: Session, user_id: int, payment_id: int) -> Optional[Payment]:
"""Delete a payment."""
payment = db.query(Payment).join(Invoice).filter(
Payment.id == payment_id, Invoice.user_id == user_id
).first()
if not payment:
return None
# Get the invoice
invoice = db.query(Invoice).filter(Invoice.id == payment.invoice_id).first()
# Delete the payment
db.delete(payment)
# Calculate total paid amount for the invoice after deletion
existing_payments = db.query(Payment).filter(
Payment.invoice_id == invoice.id, Payment.id != payment_id
).all()
total_paid = sum(payment.amount for payment in existing_payments)
# Update invoice status based on payment
if total_paid >= invoice.total:
invoice.status = InvoiceStatus.PAID
elif invoice.status == InvoiceStatus.PAID:
# If previously fully paid but now partially paid
invoice.status = InvoiceStatus.SENT
db.commit()
return payment