Automated Action 771ee5214f Implement LinkedIn-based church management system with FastAPI
- Complete FastAPI application with authentication and JWT tokens
- SQLite database with SQLAlchemy ORM and Alembic migrations
- User management with profile features and search functionality
- LinkedIn-style networking with connection requests and acceptance
- Social features: posts, likes, comments, announcements, prayer requests
- Event management with registration system and capacity limits
- RESTful API endpoints for all features with proper authorization
- Comprehensive documentation and setup instructions

Key Features:
- JWT-based authentication with bcrypt password hashing
- User profiles with bio, position, contact information
- Connection system for church member networking
- Community feed with post interactions
- Event creation, registration, and attendance tracking
- Admin role-based permissions
- Health check endpoint and API documentation

Environment Variables Required:
- SECRET_KEY: JWT secret key for token generation
2025-07-01 12:28:10 +00:00

224 lines
6.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.db.base import get_db
from app.models.user import User
from app.models.event import Event, EventRegistration
from app.schemas.event import (
EventCreate,
EventUpdate,
EventResponse,
EventRegistrationResponse,
)
from app.api.auth import get_current_user
router = APIRouter()
@router.post("/", response_model=EventResponse)
def create_event(
event: EventCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
db_event = Event(**event.dict(), created_by=current_user.id)
db.add(db_event)
db.commit()
db.refresh(db_event)
return db_event
@router.get("/", response_model=List[EventResponse])
def get_events(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
events = db.query(Event).filter(Event.is_public).offset(skip).limit(limit).all()
# Add additional info for each event
for event in events:
event.creator_name = f"{event.creator.first_name} {event.creator.last_name}"
event.registered_count = (
db.query(EventRegistration)
.filter(EventRegistration.event_id == event.id)
.count()
)
event.is_registered = (
db.query(EventRegistration)
.filter(
EventRegistration.event_id == event.id,
EventRegistration.user_id == current_user.id,
)
.first()
is not None
)
return events
@router.get("/{event_id}", response_model=EventResponse)
def get_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
event.creator_name = f"{event.creator.first_name} {event.creator.last_name}"
event.registered_count = (
db.query(EventRegistration)
.filter(EventRegistration.event_id == event.id)
.count()
)
event.is_registered = (
db.query(EventRegistration)
.filter(
EventRegistration.event_id == event.id,
EventRegistration.user_id == current_user.id,
)
.first()
is not None
)
return event
@router.put("/{event_id}", response_model=EventResponse)
def update_event(
event_id: int,
event_update: EventUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.created_by != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="Not authorized to update this event"
)
for field, value in event_update.dict(exclude_unset=True).items():
setattr(event, field, value)
db.commit()
db.refresh(event)
return event
@router.delete("/{event_id}")
def delete_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.created_by != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="Not authorized to delete this event"
)
db.delete(event)
db.commit()
return {"message": "Event deleted successfully"}
@router.post("/{event_id}/register", response_model=EventRegistrationResponse)
def register_for_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
# Check if already registered
existing_registration = (
db.query(EventRegistration)
.filter(
EventRegistration.event_id == event_id,
EventRegistration.user_id == current_user.id,
)
.first()
)
if existing_registration:
raise HTTPException(status_code=400, detail="Already registered for this event")
# Check if event is full
if event.max_attendees:
current_count = (
db.query(EventRegistration)
.filter(EventRegistration.event_id == event_id)
.count()
)
if current_count >= event.max_attendees:
raise HTTPException(status_code=400, detail="Event is full")
registration = EventRegistration(event_id=event_id, user_id=current_user.id)
db.add(registration)
db.commit()
db.refresh(registration)
registration.user_name = f"{current_user.first_name} {current_user.last_name}"
return registration
@router.delete("/{event_id}/register")
def unregister_from_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
registration = (
db.query(EventRegistration)
.filter(
EventRegistration.event_id == event_id,
EventRegistration.user_id == current_user.id,
)
.first()
)
if registration is None:
raise HTTPException(status_code=404, detail="Registration not found")
db.delete(registration)
db.commit()
return {"message": "Unregistered successfully"}
@router.get("/{event_id}/registrations", response_model=List[EventRegistrationResponse])
def get_event_registrations(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.created_by != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="Not authorized to view registrations"
)
registrations = (
db.query(EventRegistration).filter(EventRegistration.event_id == event_id).all()
)
for registration in registrations:
registration.user_name = (
f"{registration.user.first_name} {registration.user.last_name}"
)
return registrations