Automated Action c5e673d2ea Implement complete beat marketplace API with FastAPI
- Set up FastAPI project structure with main.py and requirements.txt
- Created SQLite database models for users, beats, and transactions
- Implemented Alembic database migrations
- Added user authentication system with JWT tokens
- Created beat management endpoints (CRUD operations)
- Implemented purchase/transaction system
- Added file upload/download functionality for beat files
- Created health endpoint and API documentation
- Updated README with comprehensive documentation
- Fixed all linting issues with Ruff

Environment variables needed:
- SECRET_KEY: JWT secret key for authentication
2025-07-06 17:42:23 +00:00

117 lines
4.2 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.db.session import get_db
from app.models.transaction import Transaction, TransactionStatus
from app.models.beat import Beat
from app.models.user import User
from app.schemas.transaction import TransactionCreate, TransactionResponse
from app.services.auth import get_current_active_user
import uuid
router = APIRouter()
@router.get("/", response_model=List[TransactionResponse])
async def get_user_transactions(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
transactions = db.query(Transaction).filter(Transaction.buyer_id == current_user.id).all()
return transactions
@router.get("/{transaction_id}", response_model=TransactionResponse)
async def get_transaction(
transaction_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
transaction = db.query(Transaction).filter(
Transaction.id == transaction_id,
Transaction.buyer_id == current_user.id
).first()
if not transaction:
raise HTTPException(status_code=404, detail="Transaction not found")
return transaction
@router.post("/purchase", response_model=TransactionResponse)
async def purchase_beat(
purchase_data: TransactionCreate,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
# Check if beat exists and is available
beat = db.query(Beat).filter(
Beat.id == purchase_data.beat_id,
Beat.is_available
).first()
if not beat:
raise HTTPException(status_code=404, detail="Beat not found or not available")
# Check if user is trying to buy their own beat
if beat.producer_id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot purchase your own beat")
# Check if user has already purchased this beat
existing_purchase = db.query(Transaction).filter(
Transaction.buyer_id == current_user.id,
Transaction.beat_id == purchase_data.beat_id,
Transaction.status == TransactionStatus.COMPLETED
).first()
if existing_purchase:
raise HTTPException(status_code=400, detail="Beat already purchased")
# Validate amount matches beat price
if purchase_data.amount != beat.price:
raise HTTPException(status_code=400, detail="Amount does not match beat price")
# Create transaction
transaction = Transaction(
buyer_id=current_user.id,
beat_id=purchase_data.beat_id,
amount=purchase_data.amount,
status=TransactionStatus.PENDING,
transaction_reference=str(uuid.uuid4())
)
db.add(transaction)
db.commit()
db.refresh(transaction)
return transaction
@router.put("/{transaction_id}/complete")
async def complete_transaction(
transaction_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
transaction = db.query(Transaction).filter(
Transaction.id == transaction_id,
Transaction.buyer_id == current_user.id,
Transaction.status == TransactionStatus.PENDING
).first()
if not transaction:
raise HTTPException(status_code=404, detail="Transaction not found or not pending")
# In a real application, this would integrate with a payment processor
# For now, we'll just mark it as completed
transaction.status = TransactionStatus.COMPLETED
db.commit()
db.refresh(transaction)
return {"message": "Transaction completed successfully", "transaction": transaction}
@router.get("/producer/sales", response_model=List[TransactionResponse])
async def get_producer_sales(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
if not current_user.is_producer:
raise HTTPException(status_code=403, detail="Not a producer")
# Get all transactions for beats owned by this producer
transactions = db.query(Transaction).join(Beat).filter(
Beat.producer_id == current_user.id,
Transaction.status == TransactionStatus.COMPLETED
).all()
return transactions