Automated Action b78ac1f072 Create comprehensive gym membership management system
Features:
- User registration and authentication with JWT tokens
- Multi-level admin access (Admin and Super Admin)
- Gym management with membership plans
- Subscription management with payment integration
- Stripe and Paystack payment gateway support
- Role-based access control
- SQLite database with Alembic migrations
- Comprehensive API endpoints with FastAPI
- Database models for users, gyms, memberships, subscriptions, and transactions
- Admin endpoints for user management and financial reporting
- Health check and documentation endpoints

Core Components:
- FastAPI application with CORS support
- SQLAlchemy ORM with relationship mapping
- JWT-based authentication with bcrypt password hashing
- Payment service abstraction for multiple gateways
- Pydantic schemas for request/response validation
- Alembic database migration system
- Admin dashboard functionality
- Environment variable configuration
2025-06-20 09:08:21 +00:00

223 lines
6.4 KiB
Python

from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.db.session import get_db
from app.core.deps import get_current_admin_user, get_current_super_admin_user
from app.core.security import get_password_hash
from app.models.user import User, UserRole
from app.models.gym import Gym
from app.models.membership import GymMembership
from app.models.subscription import Subscription
from app.models.transaction import Transaction, TransactionStatus
from app.schemas.user import User as UserSchema, AdminInvite
router = APIRouter()
@router.get("/users", response_model=List[UserSchema])
def get_all_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.get("/users/{user_id}", response_model=UserSchema)
def get_user_by_id(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.get("/users/{user_id}/subscriptions")
def get_user_subscriptions(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_user),
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
subscriptions = db.query(Subscription).filter(Subscription.user_id == user_id).all()
return subscriptions
@router.get("/users/{user_id}/transactions")
def get_user_transactions(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(
get_current_super_admin_user
), # Only super admin can view financial data
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
transactions = db.query(Transaction).filter(Transaction.user_id == user_id).all()
return transactions
@router.get("/stats/overview")
def get_overview_stats(
db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_user)
):
total_users = (
db.query(func.count(User.id)).filter(User.role == UserRole.USER).scalar()
)
total_gyms = db.query(func.count(Gym.id)).filter(Gym.is_active).scalar()
total_memberships = db.query(func.count(GymMembership.id)).scalar()
active_subscriptions = (
db.query(func.count(Subscription.id))
.filter(Subscription.status == "active")
.scalar()
)
return {
"total_users": total_users,
"total_gyms": total_gyms,
"total_memberships": total_memberships,
"active_subscriptions": active_subscriptions,
}
@router.get("/stats/financial")
def get_financial_stats(
db: Session = Depends(get_db),
current_user: User = Depends(
get_current_super_admin_user
), # Only super admin can view financial data
):
total_revenue = (
db.query(func.sum(Transaction.amount))
.filter(Transaction.status == TransactionStatus.COMPLETED)
.scalar()
or 0
)
pending_revenue = (
db.query(func.sum(Transaction.amount))
.filter(Transaction.status == TransactionStatus.PENDING)
.scalar()
or 0
)
failed_transactions = (
db.query(func.count(Transaction.id))
.filter(Transaction.status == TransactionStatus.FAILED)
.scalar()
)
return {
"total_revenue": total_revenue,
"pending_revenue": pending_revenue,
"failed_transactions": failed_transactions,
}
@router.get("/transactions")
def get_all_transactions(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(
get_current_super_admin_user
), # Only super admin can view all transactions
):
transactions = db.query(Transaction).offset(skip).limit(limit).all()
return transactions
@router.post("/invite-admin")
def invite_admin(
admin_data: AdminInvite,
db: Session = Depends(get_db),
current_user: User = Depends(
get_current_super_admin_user
), # Only super admin can invite admins
):
existing_user = db.query(User).filter(User.email == admin_data.email).first()
if existing_user:
raise HTTPException(
status_code=400, detail="User with this email already exists"
)
if admin_data.role == UserRole.SUPER_ADMIN:
raise HTTPException(status_code=400, detail="Cannot invite super admin")
# Generate a temporary password (in production, send via email)
temp_password = (
"TempPass123!" # In production, generate random password and send via email
)
hashed_password = get_password_hash(temp_password)
new_admin = User(
email=admin_data.email,
full_name=admin_data.full_name,
role=admin_data.role,
hashed_password=hashed_password,
invited_by=current_user.id,
)
db.add(new_admin)
db.commit()
db.refresh(new_admin)
return {
"message": "Admin invited successfully",
"admin_id": new_admin.id,
"temporary_password": temp_password, # In production, don't return this
}
@router.delete("/remove-admin/{admin_id}")
def remove_admin(
admin_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(
get_current_super_admin_user
), # Only super admin can remove admins
):
admin = (
db.query(User)
.filter(
User.id == admin_id, User.role.in_([UserRole.ADMIN, UserRole.SUPER_ADMIN])
)
.first()
)
if not admin:
raise HTTPException(status_code=404, detail="Admin not found")
if admin.role == UserRole.SUPER_ADMIN:
raise HTTPException(status_code=400, detail="Cannot remove super admin")
admin.is_active = False
db.commit()
return {"message": "Admin removed successfully"}
@router.get("/admins")
def get_all_admins(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_super_admin_user),
):
admins = (
db.query(User)
.filter(User.role.in_([UserRole.ADMIN, UserRole.SUPER_ADMIN]))
.all()
)
return admins