Implement LinkedIn-based church management system with FastAPI

- Complete FastAPI application with authentication and JWT tokens
- SQLite database with SQLAlchemy ORM and Alembic migrations
- User management with profile features and search functionality
- LinkedIn-style networking with connection requests and acceptance
- Social features: posts, likes, comments, announcements, prayer requests
- Event management with registration system and capacity limits
- RESTful API endpoints for all features with proper authorization
- Comprehensive documentation and setup instructions

Key Features:
- JWT-based authentication with bcrypt password hashing
- User profiles with bio, position, contact information
- Connection system for church member networking
- Community feed with post interactions
- Event creation, registration, and attendance tracking
- Admin role-based permissions
- Health check endpoint and API documentation

Environment Variables Required:
- SECRET_KEY: JWT secret key for token generation
This commit is contained in:
Automated Action 2025-07-01 12:28:10 +00:00
parent a08e94c27e
commit 771ee5214f
28 changed files with 1739 additions and 2 deletions

186
README.md
View File

@ -1,3 +1,185 @@
# FastAPI Application
# LinkedIn-Based Church Management System
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A comprehensive church management system built with FastAPI that incorporates LinkedIn-style networking features to help church members connect, share updates, and manage events.
## Features
### Authentication & User Management
- User registration and login with JWT tokens
- Profile management with bio, position, and contact information
- User search functionality
- Profile pictures and personal information
### LinkedIn-Style Networking
- Send connection requests to other church members
- Accept/reject connection requests
- View connections and networking activity
- Professional-style member profiles
### Social Features
- Create and share posts (announcements, prayer requests, general updates)
- Like and comment on posts
- Community feed with chronological posts
- Different post types (announcements, prayer requests)
### Event Management
- Create and manage church events
- Event registration system with capacity limits
- Public/private event settings
- Event attendance tracking
- RSVP management
### Church Member Management
- Member directory with search capabilities
- Role-based permissions (admin/member)
- Member profile management
- Contact information management
## Tech Stack
- **Backend**: FastAPI (Python)
- **Database**: SQLite with SQLAlchemy ORM
- **Authentication**: JWT tokens with bcrypt password hashing
- **Migrations**: Alembic
- **Validation**: Pydantic
- **Code Quality**: Ruff for linting
## Installation & Setup
1. **Clone the repository**
```bash
git clone <repository-url>
cd linkedinbasedchurchmanagementsystem-7j9m7d
```
2. **Install dependencies**
```bash
pip install -r requirements.txt
```
3. **Set up environment variables**
Create a `.env` file in the root directory with:
```
SECRET_KEY=your-secret-key-here-change-this-in-production
```
4. **Run database migrations**
```bash
alembic upgrade head
```
5. **Start the application**
```bash
uvicorn main:app --reload
```
The application will be available at `http://localhost:8000`
## API Documentation
Once the server is running, you can access:
- **Swagger UI**: `http://localhost:8000/docs`
- **ReDoc**: `http://localhost:8000/redoc`
- **OpenAPI JSON**: `http://localhost:8000/openapi.json`
## API Endpoints
### Authentication
- `POST /auth/register` - Register a new user
- `POST /auth/token` - Login and get access token
- `GET /auth/me` - Get current user information
### Users
- `GET /users/` - Get all users (paginated)
- `GET /users/{user_id}` - Get specific user
- `PUT /users/me` - Update current user profile
- `GET /users/search/{query}` - Search users
### Posts
- `POST /posts/` - Create a new post
- `GET /posts/` - Get all posts (paginated)
- `GET /posts/{post_id}` - Get specific post
- `PUT /posts/{post_id}` - Update post
- `DELETE /posts/{post_id}` - Delete post
- `POST /posts/{post_id}/like` - Like a post
- `DELETE /posts/{post_id}/like` - Unlike a post
- `POST /posts/{post_id}/comments` - Add comment to post
- `GET /posts/{post_id}/comments` - Get post comments
### Events
- `POST /events/` - Create a new event
- `GET /events/` - Get all events
- `GET /events/{event_id}` - Get specific event
- `PUT /events/{event_id}` - Update event
- `DELETE /events/{event_id}` - Delete event
- `POST /events/{event_id}/register` - Register for event
- `DELETE /events/{event_id}/register` - Unregister from event
- `GET /events/{event_id}/registrations` - Get event registrations
### Connections
- `POST /connections/` - Send connection request
- `GET /connections/sent` - Get sent connection requests
- `GET /connections/received` - Get received connection requests
- `GET /connections/accepted` - Get accepted connections
- `PUT /connections/{connection_id}` - Accept/reject connection
- `DELETE /connections/{connection_id}` - Remove connection
## Environment Variables
The following environment variables need to be set:
- `SECRET_KEY` - JWT secret key for token generation (required)
- `PORT` - Port number for the application (default: 8000)
## Database
The application uses SQLite as the database, stored at `/app/storage/db/db.sqlite`. The database schema includes:
- **Users**: User accounts with profiles and authentication
- **Connections**: LinkedIn-style connections between users
- **Posts**: Social media style posts with likes and comments
- **Events**: Church events with registration management
- **Comments**: Comments on posts
- **Likes**: Like system for posts
- **Event Registrations**: Event attendance tracking
## Health Check
The application provides a health check endpoint at `/health` that returns the service status and version information.
## Development
### Running with Auto-reload
```bash
uvicorn main:app --reload --host 0.0.0.0 --port 8000
```
### Code Quality
The project uses Ruff for code formatting and linting:
```bash
ruff check .
ruff format .
```
### Database Migrations
To create a new migration:
```bash
alembic revision --autogenerate -m "Description of changes"
```
To apply migrations:
```bash
alembic upgrade head
```
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Run tests and linting
5. Submit a pull request
## License
This project is licensed under the MIT License.

42
alembic.ini Normal file
View File

@ -0,0 +1,42 @@
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

51
alembic/env.py Normal file
View File

@ -0,0 +1,51 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import sys
import os
# Add the project root directory to the Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.db.base import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

26
alembic/script.py.mako Normal file
View File

@ -0,0 +1,26 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,186 @@
"""Initial migration
Revision ID: 001
Revises:
Create Date: 2025-07-01 12:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "001"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create users table
op.create_table(
"users",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("email", sa.String(), nullable=False),
sa.Column("first_name", sa.String(), nullable=False),
sa.Column("last_name", sa.String(), nullable=False),
sa.Column("hashed_password", sa.String(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=True),
sa.Column("is_admin", sa.Boolean(), nullable=True),
sa.Column("profile_picture", sa.String(), nullable=True),
sa.Column("bio", sa.Text(), nullable=True),
sa.Column("position", sa.String(), nullable=True),
sa.Column("phone", sa.String(), nullable=True),
sa.Column("address", sa.Text(), nullable=True),
sa.Column("date_joined", sa.DateTime(), nullable=False),
sa.Column("last_login", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_users_email"), "users", ["email"], unique=True)
op.create_index(op.f("ix_users_id"), "users", ["id"], unique=False)
# Create connections table
op.create_table(
"connections",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("sender_id", sa.Integer(), nullable=False),
sa.Column("receiver_id", sa.Integer(), nullable=False),
sa.Column("status", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["receiver_id"],
["users.id"],
),
sa.ForeignKeyConstraint(
["sender_id"],
["users.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_connections_id"), "connections", ["id"], unique=False)
# Create posts table
op.create_table(
"posts",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("author_id", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=False),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("image_url", sa.String(), nullable=True),
sa.Column("is_announcement", sa.Boolean(), nullable=True),
sa.Column("is_prayer_request", sa.Boolean(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["author_id"],
["users.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_posts_id"), "posts", ["id"], unique=False)
# Create events table
op.create_table(
"events",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("location", sa.String(), nullable=True),
sa.Column("start_date", sa.DateTime(), nullable=False),
sa.Column("end_date", sa.DateTime(), nullable=True),
sa.Column("max_attendees", sa.Integer(), nullable=True),
sa.Column("is_public", sa.Boolean(), nullable=True),
sa.Column("image_url", sa.String(), nullable=True),
sa.Column("created_by", sa.Integer(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["created_by"],
["users.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_events_id"), "events", ["id"], unique=False)
# Create comments table
op.create_table(
"comments",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("post_id", sa.Integer(), nullable=False),
sa.Column("author_id", sa.Integer(), nullable=False),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["author_id"],
["users.id"],
),
sa.ForeignKeyConstraint(
["post_id"],
["posts.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_comments_id"), "comments", ["id"], unique=False)
# Create likes table
op.create_table(
"likes",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("post_id", sa.Integer(), nullable=False),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["post_id"],
["posts.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["users.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_likes_id"), "likes", ["id"], unique=False)
# Create event_registrations table
op.create_table(
"event_registrations",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("event_id", sa.Integer(), nullable=False),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("status", sa.String(), nullable=True),
sa.Column("registered_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["event_id"],
["events.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["users.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_event_registrations_id"), "event_registrations", ["id"], unique=False
)
def downgrade() -> None:
op.drop_index(op.f("ix_event_registrations_id"), table_name="event_registrations")
op.drop_table("event_registrations")
op.drop_index(op.f("ix_likes_id"), table_name="likes")
op.drop_table("likes")
op.drop_index(op.f("ix_comments_id"), table_name="comments")
op.drop_table("comments")
op.drop_index(op.f("ix_events_id"), table_name="events")
op.drop_table("events")
op.drop_index(op.f("ix_posts_id"), table_name="posts")
op.drop_table("posts")
op.drop_index(op.f("ix_connections_id"), table_name="connections")
op.drop_table("connections")
op.drop_index(op.f("ix_users_id"), table_name="users")
op.drop_index(op.f("ix_users_email"), table_name="users")
op.drop_table("users")

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

100
app/api/auth.py Normal file
View File

@ -0,0 +1,100 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from app.db.base import get_db
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse, Token
from app.core.security import (
verify_password,
get_password_hash,
create_access_token,
verify_token,
)
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def create_user(db: Session, user: UserCreate):
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email,
first_name=user.first_name,
last_name=user.last_name,
hashed_password=hashed_password,
position=user.position,
phone=user.phone,
address=user.address,
bio=user.bio,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def authenticate_user(db: Session, email: str, password: str):
user = get_user_by_email(db, email)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
async def get_current_user(
token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
email = verify_token(token)
if email is None:
raise credentials_exception
user = get_user_by_email(db, email=email)
if user is None:
raise credentials_exception
return user
@router.post("/register", response_model=UserResponse)
def register(user: UserCreate, db: Session = Depends(get_db)):
db_user = get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return create_user(db=db, user=user)
@router.post("/token", response_model=Token)
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)
):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=30)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
# Update last login
user.last_login = datetime.utcnow()
db.commit()
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=UserResponse)
def read_users_me(current_user: User = Depends(get_current_user)):
return current_user

182
app/api/connections.py Normal file
View File

@ -0,0 +1,182 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.db.base import get_db
from app.models.user import User, Connection
from app.schemas.connection import (
ConnectionCreate,
ConnectionResponse,
ConnectionUpdate,
)
from app.api.auth import get_current_user
router = APIRouter()
@router.post("/", response_model=ConnectionResponse)
def send_connection_request(
connection: ConnectionCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Check if receiver exists
receiver = db.query(User).filter(User.id == connection.receiver_id).first()
if not receiver:
raise HTTPException(status_code=404, detail="User not found")
# Check if trying to connect to self
if connection.receiver_id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot connect to yourself")
# Check if connection already exists
existing_connection = (
db.query(Connection)
.filter(
(
(Connection.sender_id == current_user.id)
& (Connection.receiver_id == connection.receiver_id)
)
| (
(Connection.sender_id == connection.receiver_id)
& (Connection.receiver_id == current_user.id)
)
)
.first()
)
if existing_connection:
raise HTTPException(status_code=400, detail="Connection already exists")
db_connection = Connection(
sender_id=current_user.id, receiver_id=connection.receiver_id
)
db.add(db_connection)
db.commit()
db.refresh(db_connection)
db_connection.sender_name = f"{current_user.first_name} {current_user.last_name}"
db_connection.receiver_name = f"{receiver.first_name} {receiver.last_name}"
return db_connection
@router.get("/sent", response_model=List[ConnectionResponse])
def get_sent_connections(
db: Session = Depends(get_db), current_user: User = Depends(get_current_user)
):
connections = (
db.query(Connection).filter(Connection.sender_id == current_user.id).all()
)
for connection in connections:
connection.sender_name = f"{current_user.first_name} {current_user.last_name}"
connection.receiver_name = (
f"{connection.receiver.first_name} {connection.receiver.last_name}"
)
return connections
@router.get("/received", response_model=List[ConnectionResponse])
def get_received_connections(
db: Session = Depends(get_db), current_user: User = Depends(get_current_user)
):
connections = (
db.query(Connection).filter(Connection.receiver_id == current_user.id).all()
)
for connection in connections:
connection.sender_name = (
f"{connection.sender.first_name} {connection.sender.last_name}"
)
connection.receiver_name = f"{current_user.first_name} {current_user.last_name}"
return connections
@router.get("/accepted", response_model=List[ConnectionResponse])
def get_my_connections(
db: Session = Depends(get_db), current_user: User = Depends(get_current_user)
):
connections = (
db.query(Connection)
.filter(
(
(Connection.sender_id == current_user.id)
| (Connection.receiver_id == current_user.id)
)
& (Connection.status == "accepted")
)
.all()
)
for connection in connections:
connection.sender_name = (
f"{connection.sender.first_name} {connection.sender.last_name}"
)
connection.receiver_name = (
f"{connection.receiver.first_name} {connection.receiver.last_name}"
)
return connections
@router.put("/{connection_id}", response_model=ConnectionResponse)
def update_connection(
connection_id: int,
connection_update: ConnectionUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
connection = db.query(Connection).filter(Connection.id == connection_id).first()
if not connection:
raise HTTPException(status_code=404, detail="Connection not found")
# Only receiver can update the connection status
if connection.receiver_id != current_user.id:
raise HTTPException(
status_code=403, detail="Not authorized to update this connection"
)
# Only allow updating pending connections
if connection.status != "pending":
raise HTTPException(
status_code=400, detail="Can only update pending connections"
)
connection.status = connection_update.status
db.commit()
db.refresh(connection)
connection.sender_name = (
f"{connection.sender.first_name} {connection.sender.last_name}"
)
connection.receiver_name = f"{current_user.first_name} {current_user.last_name}"
return connection
@router.delete("/{connection_id}")
def delete_connection(
connection_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
connection = db.query(Connection).filter(Connection.id == connection_id).first()
if not connection:
raise HTTPException(status_code=404, detail="Connection not found")
# Only sender or receiver can delete the connection
if (
connection.sender_id != current_user.id
and connection.receiver_id != current_user.id
):
raise HTTPException(
status_code=403, detail="Not authorized to delete this connection"
)
db.delete(connection)
db.commit()
return {"message": "Connection deleted successfully"}

223
app/api/events.py Normal file
View File

@ -0,0 +1,223 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.db.base import get_db
from app.models.user import User
from app.models.event import Event, EventRegistration
from app.schemas.event import (
EventCreate,
EventUpdate,
EventResponse,
EventRegistrationResponse,
)
from app.api.auth import get_current_user
router = APIRouter()
@router.post("/", response_model=EventResponse)
def create_event(
event: EventCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
db_event = Event(**event.dict(), created_by=current_user.id)
db.add(db_event)
db.commit()
db.refresh(db_event)
return db_event
@router.get("/", response_model=List[EventResponse])
def get_events(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
events = db.query(Event).filter(Event.is_public).offset(skip).limit(limit).all()
# Add additional info for each event
for event in events:
event.creator_name = f"{event.creator.first_name} {event.creator.last_name}"
event.registered_count = (
db.query(EventRegistration)
.filter(EventRegistration.event_id == event.id)
.count()
)
event.is_registered = (
db.query(EventRegistration)
.filter(
EventRegistration.event_id == event.id,
EventRegistration.user_id == current_user.id,
)
.first()
is not None
)
return events
@router.get("/{event_id}", response_model=EventResponse)
def get_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
event.creator_name = f"{event.creator.first_name} {event.creator.last_name}"
event.registered_count = (
db.query(EventRegistration)
.filter(EventRegistration.event_id == event.id)
.count()
)
event.is_registered = (
db.query(EventRegistration)
.filter(
EventRegistration.event_id == event.id,
EventRegistration.user_id == current_user.id,
)
.first()
is not None
)
return event
@router.put("/{event_id}", response_model=EventResponse)
def update_event(
event_id: int,
event_update: EventUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.created_by != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="Not authorized to update this event"
)
for field, value in event_update.dict(exclude_unset=True).items():
setattr(event, field, value)
db.commit()
db.refresh(event)
return event
@router.delete("/{event_id}")
def delete_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.created_by != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="Not authorized to delete this event"
)
db.delete(event)
db.commit()
return {"message": "Event deleted successfully"}
@router.post("/{event_id}/register", response_model=EventRegistrationResponse)
def register_for_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
# Check if already registered
existing_registration = (
db.query(EventRegistration)
.filter(
EventRegistration.event_id == event_id,
EventRegistration.user_id == current_user.id,
)
.first()
)
if existing_registration:
raise HTTPException(status_code=400, detail="Already registered for this event")
# Check if event is full
if event.max_attendees:
current_count = (
db.query(EventRegistration)
.filter(EventRegistration.event_id == event_id)
.count()
)
if current_count >= event.max_attendees:
raise HTTPException(status_code=400, detail="Event is full")
registration = EventRegistration(event_id=event_id, user_id=current_user.id)
db.add(registration)
db.commit()
db.refresh(registration)
registration.user_name = f"{current_user.first_name} {current_user.last_name}"
return registration
@router.delete("/{event_id}/register")
def unregister_from_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
registration = (
db.query(EventRegistration)
.filter(
EventRegistration.event_id == event_id,
EventRegistration.user_id == current_user.id,
)
.first()
)
if registration is None:
raise HTTPException(status_code=404, detail="Registration not found")
db.delete(registration)
db.commit()
return {"message": "Unregistered successfully"}
@router.get("/{event_id}/registrations", response_model=List[EventRegistrationResponse])
def get_event_registrations(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
event = db.query(Event).filter(Event.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.created_by != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="Not authorized to view registrations"
)
registrations = (
db.query(EventRegistration).filter(EventRegistration.event_id == event_id).all()
)
for registration in registrations:
registration.user_name = (
f"{registration.user.first_name} {registration.user.last_name}"
)
return registrations

202
app/api/posts.py Normal file
View File

@ -0,0 +1,202 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.db.base import get_db
from app.models.user import User
from app.models.post import Post, Comment, Like
from app.schemas.post import (
PostCreate,
PostUpdate,
PostResponse,
CommentCreate,
CommentResponse,
)
from app.api.auth import get_current_user
router = APIRouter()
@router.post("/", response_model=PostResponse)
def create_post(
post: PostCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
db_post = Post(**post.dict(), author_id=current_user.id)
db.add(db_post)
db.commit()
db.refresh(db_post)
db_post.author_name = f"{current_user.first_name} {current_user.last_name}"
db_post.likes_count = 0
db_post.comments_count = 0
return db_post
@router.get("/", response_model=List[PostResponse])
def get_posts(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
posts = (
db.query(Post).order_by(Post.created_at.desc()).offset(skip).limit(limit).all()
)
for post in posts:
post.author_name = f"{post.author.first_name} {post.author.last_name}"
post.likes_count = db.query(Like).filter(Like.post_id == post.id).count()
post.comments_count = (
db.query(Comment).filter(Comment.post_id == post.id).count()
)
return posts
@router.get("/{post_id}", response_model=PostResponse)
def get_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
post.author_name = f"{post.author.first_name} {post.author.last_name}"
post.likes_count = db.query(Like).filter(Like.post_id == post.id).count()
post.comments_count = db.query(Comment).filter(Comment.post_id == post.id).count()
return post
@router.put("/{post_id}", response_model=PostResponse)
def update_post(
post_id: int,
post_update: PostUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.author_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="Not authorized to update this post"
)
for field, value in post_update.dict(exclude_unset=True).items():
setattr(post, field, value)
db.commit()
db.refresh(post)
return post
@router.delete("/{post_id}")
def delete_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.author_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="Not authorized to delete this post"
)
db.delete(post)
db.commit()
return {"message": "Post deleted successfully"}
@router.post("/{post_id}/like")
def like_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
existing_like = (
db.query(Like)
.filter(Like.post_id == post_id, Like.user_id == current_user.id)
.first()
)
if existing_like:
raise HTTPException(status_code=400, detail="Already liked this post")
like = Like(post_id=post_id, user_id=current_user.id)
db.add(like)
db.commit()
return {"message": "Post liked successfully"}
@router.delete("/{post_id}/like")
def unlike_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
like = (
db.query(Like)
.filter(Like.post_id == post_id, Like.user_id == current_user.id)
.first()
)
if not like:
raise HTTPException(status_code=404, detail="Like not found")
db.delete(like)
db.commit()
return {"message": "Post unliked successfully"}
@router.post("/{post_id}/comments", response_model=CommentResponse)
def create_comment(
post_id: int,
comment: CommentCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
db_comment = Comment(**comment.dict(), post_id=post_id, author_id=current_user.id)
db.add(db_comment)
db.commit()
db.refresh(db_comment)
db_comment.author_name = f"{current_user.first_name} {current_user.last_name}"
return db_comment
@router.get("/{post_id}/comments", response_model=List[CommentResponse])
def get_comments(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
comments = (
db.query(Comment)
.filter(Comment.post_id == post_id)
.order_by(Comment.created_at.asc())
.all()
)
for comment in comments:
comment.author_name = f"{comment.author.first_name} {comment.author.last_name}"
return comments

68
app/api/users.py Normal file
View File

@ -0,0 +1,68 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.db.base import get_db
from app.models.user import User
from app.schemas.user import UserResponse, UserUpdate
from app.api.auth import get_current_user
router = APIRouter()
@router.get("/", response_model=List[UserResponse])
def get_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
users = db.query(User).filter(User.is_active).offset(skip).limit(limit).all()
return users
@router.get("/{user_id}", response_model=UserResponse)
def get_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
user = db.query(User).filter(User.id == user_id, User.is_active).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.put("/me", response_model=UserResponse)
def update_my_profile(
user_update: UserUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
for field, value in user_update.dict(exclude_unset=True).items():
setattr(current_user, field, value)
db.commit()
db.refresh(current_user)
return current_user
@router.get("/search/{query}", response_model=List[UserResponse])
def search_users(
query: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
users = (
db.query(User)
.filter(
User.is_active,
(
User.first_name.contains(query)
| User.last_name.contains(query)
| User.email.contains(query)
| User.position.contains(query)
),
)
.limit(20)
.all()
)
return users

0
app/core/__init__.py Normal file
View File

7
app/core/config.py Normal file
View File

@ -0,0 +1,7 @@
from decouple import config
SECRET_KEY = config(
"SECRET_KEY", default="your-secret-key-here-change-this-in-production"
)
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

37
app/core/security.py Normal file
View File

@ -0,0 +1,37 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
return None
return email
except JWTError:
return None

0
app/db/__init__.py Normal file
View File

25
app/db/base.py Normal file
View File

@ -0,0 +1,25 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pathlib import Path
DB_DIR = Path("/app/storage/db")
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

0
app/models/__init__.py Normal file
View File

43
app/models/event.py Normal file
View File

@ -0,0 +1,43 @@
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.base import Base
class Event(Base):
__tablename__ = "events"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
description = Column(Text, nullable=True)
location = Column(String, nullable=True)
start_date = Column(DateTime, nullable=False)
end_date = Column(DateTime, nullable=True)
max_attendees = Column(Integer, nullable=True)
is_public = Column(Boolean, default=True)
image_url = Column(String, nullable=True)
created_by = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime, nullable=False, default=func.now())
updated_at = Column(
DateTime, nullable=False, default=func.now(), onupdate=func.now()
)
# Relationships
creator = relationship("User")
registrations = relationship(
"EventRegistration", back_populates="event", cascade="all, delete-orphan"
)
class EventRegistration(Base):
__tablename__ = "event_registrations"
id = Column(Integer, primary_key=True, index=True)
event_id = Column(Integer, ForeignKey("events.id"), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
status = Column(String, default="registered") # registered, attended, cancelled
registered_at = Column(DateTime, nullable=False, default=func.now())
# Relationships
event = relationship("Event", back_populates="registrations")
user = relationship("User", back_populates="event_registrations")

57
app/models/post.py Normal file
View File

@ -0,0 +1,57 @@
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.base import Base
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
title = Column(String, nullable=False)
content = Column(Text, nullable=False)
image_url = Column(String, nullable=True)
is_announcement = Column(Boolean, default=False)
is_prayer_request = Column(Boolean, default=False)
created_at = Column(DateTime, nullable=False, default=func.now())
updated_at = Column(
DateTime, nullable=False, default=func.now(), onupdate=func.now()
)
# Relationships
author = relationship("User", back_populates="posts")
comments = relationship(
"Comment", back_populates="post", cascade="all, delete-orphan"
)
likes = relationship("Like", back_populates="post", cascade="all, delete-orphan")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
post_id = Column(Integer, ForeignKey("posts.id"), nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
content = Column(Text, nullable=False)
created_at = Column(DateTime, nullable=False, default=func.now())
updated_at = Column(
DateTime, nullable=False, default=func.now(), onupdate=func.now()
)
# Relationships
post = relationship("Post", back_populates="comments")
author = relationship("User", back_populates="comments")
class Like(Base):
__tablename__ = "likes"
id = Column(Integer, primary_key=True, index=True)
post_id = Column(Integer, ForeignKey("posts.id"), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime, nullable=False, default=func.now())
# Relationships
post = relationship("Post", back_populates="likes")
user = relationship("User", back_populates="likes")

56
app/models/user.py Normal file
View File

@ -0,0 +1,56 @@
from sqlalchemy import Boolean, Column, Integer, String, DateTime, Text, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.base import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
first_name = Column(String, nullable=False)
last_name = Column(String, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
profile_picture = Column(String, nullable=True)
bio = Column(Text, nullable=True)
position = Column(String, nullable=True) # Church position/role
phone = Column(String, nullable=True)
address = Column(Text, nullable=True)
date_joined = Column(DateTime, nullable=False, default=func.now())
last_login = Column(DateTime, nullable=True)
# Relationships
sent_connections = relationship(
"Connection", foreign_keys="Connection.sender_id", back_populates="sender"
)
received_connections = relationship(
"Connection", foreign_keys="Connection.receiver_id", back_populates="receiver"
)
posts = relationship("Post", back_populates="author")
comments = relationship("Comment", back_populates="author")
likes = relationship("Like", back_populates="user")
event_registrations = relationship("EventRegistration", back_populates="user")
class Connection(Base):
__tablename__ = "connections"
id = Column(Integer, primary_key=True, index=True)
sender_id = Column(Integer, ForeignKey("users.id"), nullable=False)
receiver_id = Column(Integer, ForeignKey("users.id"), nullable=False)
status = Column(String, default="pending") # pending, accepted, rejected
created_at = Column(DateTime, nullable=False, default=func.now())
updated_at = Column(
DateTime, nullable=False, default=func.now(), onupdate=func.now()
)
# Relationships
sender = relationship(
"User", foreign_keys=[sender_id], back_populates="sent_connections"
)
receiver = relationship(
"User", foreign_keys=[receiver_id], back_populates="received_connections"
)

0
app/schemas/__init__.py Normal file
View File

29
app/schemas/connection.py Normal file
View File

@ -0,0 +1,29 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
class ConnectionBase(BaseModel):
receiver_id: int
class ConnectionCreate(ConnectionBase):
pass
class ConnectionResponse(BaseModel):
id: int
sender_id: int
receiver_id: int
status: str
created_at: datetime
updated_at: datetime
sender_name: Optional[str] = None
receiver_name: Optional[str] = None
class Config:
from_attributes = True
class ConnectionUpdate(BaseModel):
status: str # accepted or rejected

54
app/schemas/event.py Normal file
View File

@ -0,0 +1,54 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
class EventBase(BaseModel):
title: str
description: Optional[str] = None
location: Optional[str] = None
start_date: datetime
end_date: Optional[datetime] = None
max_attendees: Optional[int] = None
is_public: bool = True
image_url: Optional[str] = None
class EventCreate(EventBase):
pass
class EventUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
location: Optional[str] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
max_attendees: Optional[int] = None
is_public: Optional[bool] = None
image_url: Optional[str] = None
class EventResponse(EventBase):
id: int
created_by: int
created_at: datetime
updated_at: datetime
creator_name: Optional[str] = None
registered_count: int = 0
is_registered: bool = False
class Config:
from_attributes = True
class EventRegistrationResponse(BaseModel):
id: int
event_id: int
user_id: int
status: str
registered_at: datetime
user_name: Optional[str] = None
class Config:
from_attributes = True

56
app/schemas/post.py Normal file
View File

@ -0,0 +1,56 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
class PostBase(BaseModel):
title: str
content: str
image_url: Optional[str] = None
is_announcement: bool = False
is_prayer_request: bool = False
class PostCreate(PostBase):
pass
class PostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
image_url: Optional[str] = None
is_announcement: Optional[bool] = None
is_prayer_request: Optional[bool] = None
class PostResponse(PostBase):
id: int
author_id: int
created_at: datetime
updated_at: datetime
author_name: Optional[str] = None
likes_count: int = 0
comments_count: int = 0
class Config:
from_attributes = True
class CommentBase(BaseModel):
content: str
class CommentCreate(CommentBase):
pass
class CommentResponse(CommentBase):
id: int
post_id: int
author_id: int
author_name: Optional[str] = None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True

48
app/schemas/user.py Normal file
View File

@ -0,0 +1,48 @@
from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import Optional
class UserBase(BaseModel):
email: EmailStr
first_name: str
last_name: str
position: Optional[str] = None
phone: Optional[str] = None
address: Optional[str] = None
bio: Optional[str] = None
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
first_name: Optional[str] = None
last_name: Optional[str] = None
position: Optional[str] = None
phone: Optional[str] = None
address: Optional[str] = None
bio: Optional[str] = None
profile_picture: Optional[str] = None
class UserResponse(UserBase):
id: int
is_active: bool
is_admin: bool
profile_picture: Optional[str] = None
date_joined: datetime
last_login: Optional[datetime] = None
class Config:
from_attributes = True
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None

52
main.py Normal file
View File

@ -0,0 +1,52 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api import auth, users, posts, events, connections
import os
app = FastAPI(
title="LinkedIn-Based Church Management System",
description="A church management system with LinkedIn-style networking features",
version="1.0.0",
openapi_url="/openapi.json",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(auth.router, prefix="/auth", tags=["Authentication"])
app.include_router(users.router, prefix="/users", tags=["Users"])
app.include_router(posts.router, prefix="/posts", tags=["Posts"])
app.include_router(events.router, prefix="/events", tags=["Events"])
app.include_router(connections.router, prefix="/connections", tags=["Connections"])
@app.get("/")
async def root():
return {
"title": "LinkedIn-Based Church Management System",
"documentation": "/docs",
"health": "/health",
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"service": "LinkedIn-Based Church Management System",
"version": "1.0.0",
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app", host="0.0.0.0", port=int(os.getenv("PORT", 8000)), reload=True
)

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
alembic==1.12.1
python-multipart==0.0.6
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-decouple==3.8
ruff==0.1.6
pydantic==2.5.0
email-validator==2.1.0