Automated Action 771ee5214f Implement LinkedIn-based church management system with FastAPI
- Complete FastAPI application with authentication and JWT tokens
- SQLite database with SQLAlchemy ORM and Alembic migrations
- User management with profile features and search functionality
- LinkedIn-style networking with connection requests and acceptance
- Social features: posts, likes, comments, announcements, prayer requests
- Event management with registration system and capacity limits
- RESTful API endpoints for all features with proper authorization
- Comprehensive documentation and setup instructions

Key Features:
- JWT-based authentication with bcrypt password hashing
- User profiles with bio, position, contact information
- Connection system for church member networking
- Community feed with post interactions
- Event creation, registration, and attendance tracking
- Admin role-based permissions
- Health check endpoint and API documentation

Environment Variables Required:
- SECRET_KEY: JWT secret key for token generation
2025-07-01 12:28:10 +00:00

183 lines
5.6 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.db.base import get_db
from app.models.user import User, Connection
from app.schemas.connection import (
ConnectionCreate,
ConnectionResponse,
ConnectionUpdate,
)
from app.api.auth import get_current_user
router = APIRouter()
@router.post("/", response_model=ConnectionResponse)
def send_connection_request(
connection: ConnectionCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Check if receiver exists
receiver = db.query(User).filter(User.id == connection.receiver_id).first()
if not receiver:
raise HTTPException(status_code=404, detail="User not found")
# Check if trying to connect to self
if connection.receiver_id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot connect to yourself")
# Check if connection already exists
existing_connection = (
db.query(Connection)
.filter(
(
(Connection.sender_id == current_user.id)
& (Connection.receiver_id == connection.receiver_id)
)
| (
(Connection.sender_id == connection.receiver_id)
& (Connection.receiver_id == current_user.id)
)
)
.first()
)
if existing_connection:
raise HTTPException(status_code=400, detail="Connection already exists")
db_connection = Connection(
sender_id=current_user.id, receiver_id=connection.receiver_id
)
db.add(db_connection)
db.commit()
db.refresh(db_connection)
db_connection.sender_name = f"{current_user.first_name} {current_user.last_name}"
db_connection.receiver_name = f"{receiver.first_name} {receiver.last_name}"
return db_connection
@router.get("/sent", response_model=List[ConnectionResponse])
def get_sent_connections(
db: Session = Depends(get_db), current_user: User = Depends(get_current_user)
):
connections = (
db.query(Connection).filter(Connection.sender_id == current_user.id).all()
)
for connection in connections:
connection.sender_name = f"{current_user.first_name} {current_user.last_name}"
connection.receiver_name = (
f"{connection.receiver.first_name} {connection.receiver.last_name}"
)
return connections
@router.get("/received", response_model=List[ConnectionResponse])
def get_received_connections(
db: Session = Depends(get_db), current_user: User = Depends(get_current_user)
):
connections = (
db.query(Connection).filter(Connection.receiver_id == current_user.id).all()
)
for connection in connections:
connection.sender_name = (
f"{connection.sender.first_name} {connection.sender.last_name}"
)
connection.receiver_name = f"{current_user.first_name} {current_user.last_name}"
return connections
@router.get("/accepted", response_model=List[ConnectionResponse])
def get_my_connections(
db: Session = Depends(get_db), current_user: User = Depends(get_current_user)
):
connections = (
db.query(Connection)
.filter(
(
(Connection.sender_id == current_user.id)
| (Connection.receiver_id == current_user.id)
)
& (Connection.status == "accepted")
)
.all()
)
for connection in connections:
connection.sender_name = (
f"{connection.sender.first_name} {connection.sender.last_name}"
)
connection.receiver_name = (
f"{connection.receiver.first_name} {connection.receiver.last_name}"
)
return connections
@router.put("/{connection_id}", response_model=ConnectionResponse)
def update_connection(
connection_id: int,
connection_update: ConnectionUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
connection = db.query(Connection).filter(Connection.id == connection_id).first()
if not connection:
raise HTTPException(status_code=404, detail="Connection not found")
# Only receiver can update the connection status
if connection.receiver_id != current_user.id:
raise HTTPException(
status_code=403, detail="Not authorized to update this connection"
)
# Only allow updating pending connections
if connection.status != "pending":
raise HTTPException(
status_code=400, detail="Can only update pending connections"
)
connection.status = connection_update.status
db.commit()
db.refresh(connection)
connection.sender_name = (
f"{connection.sender.first_name} {connection.sender.last_name}"
)
connection.receiver_name = f"{current_user.first_name} {current_user.last_name}"
return connection
@router.delete("/{connection_id}")
def delete_connection(
connection_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
connection = db.query(Connection).filter(Connection.id == connection_id).first()
if not connection:
raise HTTPException(status_code=404, detail="Connection not found")
# Only sender or receiver can delete the connection
if (
connection.sender_id != current_user.id
and connection.receiver_id != current_user.id
):
raise HTTPException(
status_code=403, detail="Not authorized to delete this connection"
)
db.delete(connection)
db.commit()
return {"message": "Connection deleted successfully"}