Build Chat Application Backend with FastAPI and SQLite

- Set up project structure with FastAPI and SQLite
- Implement user authentication using JWT
- Create database models for users, conversations, and messages
- Implement API endpoints for user management and chat functionality
- Set up WebSocket for real-time messaging
- Add database migrations with Alembic
- Create health check endpoint
- Update README with comprehensive documentation

generated with BackendIM... (backend.im)
This commit is contained in:
Automated Action 2025-05-12 16:37:35 +00:00
parent 63c6aa3dfc
commit 16cfcd783e
39 changed files with 1726 additions and 2 deletions

208
README.md
View File

@ -1,3 +1,207 @@
# FastAPI Application
# Chat Application Backend
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A real-time chat application backend built with FastAPI and SQLite, featuring WebSocket support for instant messaging.
## Features
- User authentication with JWT
- User profile management
- Real-time messaging with WebSockets
- Support for direct messages and group chats
- Message read status tracking
- Typing indicators
- RESTful API for chat functionality
- Database migrations with Alembic
- Health check endpoint
## Tech Stack
- **Framework**: FastAPI
- **Database**: SQLite
- **ORM**: SQLAlchemy
- **Authentication**: JWT (JSON Web Tokens)
- **Real-time Communication**: WebSockets
- **Migration**: Alembic
## Project Structure
```
.
├── alembic/ # Database migration files
│ ├── versions/ # Migration version scripts
│ ├── env.py # Alembic environment configuration
│ └── script.py.mako # Migration script template
├── app/ # Application package
│ ├── api/ # API endpoints
│ │ ├── dependencies/ # Endpoint dependencies (auth, etc.)
│ │ └── endpoints/ # API route definitions
│ ├── core/ # Core application components
│ │ ├── config.py # Application configuration
│ │ └── security.py # Security utilities
│ ├── db/ # Database components
│ │ ├── repositories/ # Database CRUD operations
│ │ └── session.py # Database session management
│ ├── models/ # SQLAlchemy ORM models
│ ├── schemas/ # Pydantic schemas for validation
│ ├── services/ # Business logic services
│ └── utils/ # Utility functions
├── main.py # Application entry point
├── alembic.ini # Alembic configuration
└── requirements.txt # Project dependencies
```
## API Endpoints
### Authentication
- `POST /api/v1/auth/signup` - Register a new user
- `POST /api/v1/auth/login` - Login and get access token
- `GET /api/v1/auth/me` - Get current user information
### User Management
- `GET /api/v1/users` - List all users
- `GET /api/v1/users/{user_id}` - Get user details
- `PUT /api/v1/users/me` - Update current user profile
### Conversations
- `POST /api/v1/conversations` - Create a new conversation
- `GET /api/v1/conversations` - List user's conversations
- `GET /api/v1/conversations/{conversation_id}` - Get conversation details
- `PUT /api/v1/conversations/{conversation_id}` - Update conversation details
### Messages
- `POST /api/v1/conversations/{conversation_id}/messages` - Send a message
- `GET /api/v1/conversations/{conversation_id}/messages` - Get conversation messages
- `PUT /api/v1/messages/{message_id}/read` - Mark a message as read
- `GET /api/v1/messages/unread/count` - Get unread message count
### WebSocket
- `WebSocket /api/v1/ws` - Real-time messaging WebSocket endpoint
### Health Check
- `GET /health` - Application health check endpoint
## Getting Started
### Prerequisites
- Python 3.8+
- SQLite
### Installation
1. Clone the repository
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run database migrations:
```bash
alembic upgrade head
```
4. Start the server:
```bash
uvicorn main:app --reload
```
The API will be available at http://localhost:8000 with documentation at http://localhost:8000/docs.
## WebSocket Communication Protocol
The WebSocket endpoint supports the following message types:
### Authentication
Client sends:
```json
{
"token": "your_jwt_token"
}
```
### Send Message
Client sends:
```json
{
"type": "message",
"conversation_id": "conversation_uuid",
"content": "Hello, world!",
"recipient_id": "recipient_uuid" // Optional, for direct messages
}
```
### Mark Message as Read
Client sends:
```json
{
"type": "mark_read",
"message_id": "message_uuid"
}
```
### Typing Indicator
Client sends:
```json
{
"type": "typing",
"conversation_id": "conversation_uuid"
}
```
### Server Push Notifications
Server sends:
1. New message:
```json
{
"type": "new_message",
"message": {
"id": "message_uuid",
"content": "Hello, world!",
"sender_id": "sender_uuid",
"recipient_id": "recipient_uuid",
"conversation_id": "conversation_uuid",
"is_read": false,
"created_at": "2023-05-12T10:30:00Z",
"updated_at": "2023-05-12T10:30:00Z"
},
"timestamp": "2023-05-12T10:30:00Z"
}
```
2. Message read confirmation:
```json
{
"type": "message_read",
"message_id": "message_uuid",
"timestamp": "2023-05-12T10:31:00Z"
}
```
3. User typing notification:
```json
{
"type": "user_typing",
"user_id": "user_uuid",
"conversation_id": "conversation_uuid",
"timestamp": "2023-05-12T10:32:00Z"
}
```
## License
This project is licensed under the MIT License.

104
alembic.ini Normal file
View File

@ -0,0 +1,104 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:///./app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

83
alembic/env.py Normal file
View File

@ -0,0 +1,83 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.core.config import settings
from app.db.session import Base
from app.models import User, Conversation, Message, conversation_participants
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Override sqlalchemy.url from settings
config.set_main_option("sqlalchemy.url", settings.SQLALCHEMY_DATABASE_URL)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,85 @@
"""Initial migration
Revision ID: 001
Revises:
Create Date: 2025-05-12
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create users table
op.create_table(
'users',
sa.Column('id', sa.String(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True, default=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
# Create conversations table
op.create_table(
'conversations',
sa.Column('id', sa.String(), nullable=False),
sa.Column('name', sa.String(), nullable=True),
sa.Column('is_group', sa.String(), nullable=True, default=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_conversations_id'), 'conversations', ['id'], unique=False)
# Create conversation_participants table
op.create_table(
'conversation_participants',
sa.Column('conversation_id', sa.String(), nullable=False),
sa.Column('user_id', sa.String(), nullable=False),
sa.ForeignKeyConstraint(['conversation_id'], ['conversations.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('conversation_id', 'user_id')
)
# Create messages table
op.create_table(
'messages',
sa.Column('id', sa.String(), nullable=False),
sa.Column('content', sa.String(), nullable=False),
sa.Column('sender_id', sa.String(), nullable=False),
sa.Column('recipient_id', sa.String(), nullable=True),
sa.Column('conversation_id', sa.String(), nullable=False),
sa.Column('is_read', sa.Boolean(), nullable=True, default=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.ForeignKeyConstraint(['conversation_id'], ['conversations.id'], ),
sa.ForeignKeyConstraint(['recipient_id'], ['users.id'], ),
sa.ForeignKeyConstraint(['sender_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_messages_id'), 'messages', ['id'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_messages_id'), table_name='messages')
op.drop_table('messages')
op.drop_table('conversation_participants')
op.drop_index(op.f('ix_conversations_id'), table_name='conversations')
op.drop_table('conversations')
op.drop_index(op.f('ix_users_username'), table_name='users')
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')

1
app/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

1
app/api/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

View File

@ -0,0 +1,50 @@
from typing import Generator, Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.security import verify_password
from app.db.repositories.user import user_repository
from app.db.session import get_db
from app.models.user import User
from app.schemas.token import TokenPayload
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=["HS256"]
)
token_data = TokenPayload(**payload)
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = user_repository.get(db, id=token_data.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return user
def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
if not user_repository.is_active(current_user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user",
)
return current_user

View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

81
app/api/endpoints/auth.py Normal file
View File

@ -0,0 +1,81 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.api.dependencies.auth import get_current_user
from app.core.config import settings
from app.core.security import create_access_token
from app.db.repositories.user import user_repository
from app.db.session import get_db
from app.models.user import User
from app.schemas.token import Token
from app.schemas.user import User as UserSchema, UserCreate
router = APIRouter(tags=["authentication"])
@router.post("/auth/login", response_model=Token)
def login(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
Get the JWT for a user with data from OAuth2 request form body.
"""
user = user_repository.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect email or password",
)
if not user_repository.is_active(user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(user.id, expires_delta=access_token_expires),
"token_type": "bearer",
}
@router.post("/auth/signup", response_model=UserSchema)
def create_user(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
) -> Any:
"""
Create new user.
"""
user = user_repository.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists",
)
user = user_repository.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists",
)
user = user_repository.create(db, obj_in=user_in)
return user
@router.get("/auth/me", response_model=UserSchema)
def read_users_me(
current_user: User = Depends(get_current_user),
) -> Any:
"""
Get current user.
"""
return current_user

View File

@ -0,0 +1,116 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.dependencies.auth import get_current_active_user
from app.db.repositories.conversation import conversation_repository
from app.db.repositories.user import user_repository
from app.db.session import get_db
from app.models.user import User
from app.schemas.conversation import Conversation, ConversationCreate, ConversationUpdate
router = APIRouter(tags=["conversations"])
@router.post("/conversations", response_model=Conversation)
def create_conversation(
*,
db: Session = Depends(get_db),
conversation_in: ConversationCreate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Create new conversation.
"""
# Check if direct conversation between these two users already exists
if len(conversation_in.participant_ids) == 1 and not conversation_in.is_group:
other_user_id = conversation_in.participant_ids[0]
existing_conversation = conversation_repository.get_conversation_between_users(
db, user_id_1=current_user.id, user_id_2=other_user_id
)
if existing_conversation:
return existing_conversation
# Check if all participants exist
for user_id in conversation_in.participant_ids:
user = user_repository.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found",
)
conversation = conversation_repository.create_with_participants(
db, obj_in=conversation_in, creator_id=current_user.id
)
return conversation
@router.get("/conversations", response_model=List[Conversation])
def read_conversations(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve conversations.
"""
conversations = conversation_repository.get_user_conversations(
db, user_id=current_user.id, skip=skip, limit=limit
)
return conversations
@router.get("/conversations/{conversation_id}", response_model=Conversation)
def read_conversation(
conversation_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get a specific conversation by id.
"""
conversation = conversation_repository.get(db, id=conversation_id)
if not conversation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Conversation not found",
)
# Check if user is a participant in this conversation
if current_user not in conversation.participants:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not a participant of this conversation",
)
return conversation
@router.put("/conversations/{conversation_id}", response_model=Conversation)
def update_conversation(
*,
conversation_id: str,
conversation_in: ConversationUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Update a conversation.
"""
conversation = conversation_repository.get(db, id=conversation_id)
if not conversation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Conversation not found",
)
# Check if user is a participant in this conversation
if current_user not in conversation.participants:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not a participant of this conversation",
)
conversation = conversation_repository.update(
db, db_obj=conversation, obj_in=conversation_in
)
return conversation

View File

@ -0,0 +1,23 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.db.session import get_db
router = APIRouter(tags=["health"])
@router.get("/health")
def health_check(db: Session = Depends(get_db)):
"""
Health check endpoint to verify API is up and running and can connect to the database
"""
try:
# Try to connect to the database
db.execute("SELECT 1")
db_status = "up"
except Exception:
db_status = "down"
return {
"status": "ok",
"database": db_status
}

View File

@ -0,0 +1,111 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.dependencies.auth import get_current_active_user
from app.db.repositories.message import message_repository
from app.db.repositories.conversation import conversation_repository
from app.db.session import get_db
from app.models.user import User
from app.schemas.message import Message, MessageCreate, MessageUpdate
router = APIRouter(tags=["messages"])
@router.post("/conversations/{conversation_id}/messages", response_model=Message)
def create_message(
*,
conversation_id: str,
message_in: MessageCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Create new message in a conversation.
"""
# Check if conversation exists and user is a participant
conversation = conversation_repository.get(db, id=conversation_id)
if not conversation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Conversation not found",
)
# Check if user is a participant in this conversation
if current_user not in conversation.participants:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not a participant of this conversation",
)
# Add conversation_id to message if not already provided
if not message_in.conversation_id:
message_in.conversation_id = conversation_id
message = message_repository.create_with_sender(
db, obj_in=message_in, sender_id=current_user.id
)
return message
@router.get("/conversations/{conversation_id}/messages", response_model=List[Message])
def read_messages(
conversation_id: str,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve messages from a conversation.
"""
# Check if conversation exists and user is a participant
conversation = conversation_repository.get(db, id=conversation_id)
if not conversation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Conversation not found",
)
# Check if user is a participant in this conversation
if current_user not in conversation.participants:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not a participant of this conversation",
)
messages = message_repository.get_conversation_messages(
db, conversation_id=conversation_id, skip=skip, limit=limit
)
return messages
@router.put("/messages/{message_id}/read", response_model=Message)
def mark_message_as_read(
message_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Mark a message as read.
"""
message = message_repository.mark_as_read(
db, message_id=message_id, user_id=current_user.id
)
if not message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Message not found or you're not the recipient",
)
return message
@router.get("/messages/unread/count")
def get_unread_count(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get count of unread messages for the current user.
"""
count = message_repository.get_unread_count(db, user_id=current_user.id)
return {"count": count}

View File

@ -0,0 +1,55 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.dependencies.auth import get_current_active_user
from app.db.repositories.user import user_repository
from app.db.session import get_db
from app.models.user import User
from app.schemas.user import User as UserSchema, UserUpdate
router = APIRouter(tags=["users"])
@router.get("/users", response_model=List[UserSchema])
def read_users(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve users.
"""
users = user_repository.get_multi(db, skip=skip, limit=limit)
return users
@router.get("/users/{user_id}", response_model=UserSchema)
def read_user(
user_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get a specific user by id.
"""
user = user_repository.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return user
@router.put("/users/me", response_model=UserSchema)
def update_user_me(
*,
db: Session = Depends(get_db),
user_in: UserUpdate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Update own user.
"""
user = user_repository.update(db, db_obj=current_user, obj_in=user_in)
return user

View File

@ -0,0 +1,237 @@
import json
from typing import Dict, List, Any
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect, HTTPException, status
from sqlalchemy.orm import Session
from jose import jwt, JWTError
from app.core.config import settings
from app.db.session import get_db
from app.db.repositories.user import user_repository
from app.db.repositories.message import message_repository
from app.db.repositories.conversation import conversation_repository
from app.schemas.message import MessageCreate
router = APIRouter(tags=["websocket"])
# Store connected websocket clients
# Map of user_id -> WebSocket
connected_users: Dict[str, WebSocket] = {}
async def get_user_from_token(token: str, db: Session) -> Any:
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
user_id = payload.get("sub")
if user_id is None:
return None
user = user_repository.get(db, id=user_id)
return user
except JWTError:
return None
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, db: Session = Depends(get_db)):
await websocket.accept()
user = None
try:
# Authenticate user
auth_message = await websocket.receive_text()
auth_data = json.loads(auth_message)
token = auth_data.get("token")
if not token:
await websocket.send_json({"error": "Authentication required"})
await websocket.close()
return
user = await get_user_from_token(token, db)
if not user:
await websocket.send_json({"error": "Invalid authentication token"})
await websocket.close()
return
# Store the connection
connected_users[user.id] = websocket
# Notify user that connection is established
await websocket.send_json({
"type": "connection_established",
"user_id": user.id,
"timestamp": datetime.utcnow().isoformat()
})
# Send unread message count
unread_count = message_repository.get_unread_count(db, user_id=user.id)
await websocket.send_json({
"type": "unread_count",
"count": unread_count,
"timestamp": datetime.utcnow().isoformat()
})
# Handle incoming messages
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
# Expected message format:
# {
# "type": "message",
# "conversation_id": "...",
# "content": "...",
# "recipient_id": "..." (optional, for direct messages)
# }
if message_data.get("type") == "message":
conversation_id = message_data.get("conversation_id")
content = message_data.get("content")
recipient_id = message_data.get("recipient_id")
if not conversation_id or not content:
await websocket.send_json({
"type": "error",
"message": "Missing required fields",
"timestamp": datetime.utcnow().isoformat()
})
continue
# Check if conversation exists and user is a participant
conversation = conversation_repository.get(db, id=conversation_id)
if not conversation:
await websocket.send_json({
"type": "error",
"message": "Conversation not found",
"timestamp": datetime.utcnow().isoformat()
})
continue
if user not in conversation.participants:
await websocket.send_json({
"type": "error",
"message": "Not a participant of this conversation",
"timestamp": datetime.utcnow().isoformat()
})
continue
# Create message
message_in = MessageCreate(
content=content,
conversation_id=conversation_id,
recipient_id=recipient_id
)
message = message_repository.create_with_sender(
db, obj_in=message_in, sender_id=user.id
)
# Prepare message data for sending to clients
message_out = {
"type": "new_message",
"message": {
"id": message.id,
"content": message.content,
"sender_id": message.sender_id,
"recipient_id": message.recipient_id,
"conversation_id": message.conversation_id,
"is_read": message.is_read,
"created_at": message.created_at.isoformat(),
"updated_at": message.updated_at.isoformat(),
},
"timestamp": datetime.utcnow().isoformat()
}
# Send message to the sender for confirmation
await websocket.send_json(message_out)
# Send message to other participants who are connected
for participant in conversation.participants:
if participant.id != user.id and participant.id in connected_users:
try:
await connected_users[participant.id].send_json(message_out)
except Exception:
# Handle errors in sending to a particular client
pass
elif message_data.get("type") == "mark_read":
message_id = message_data.get("message_id")
if not message_id:
await websocket.send_json({
"type": "error",
"message": "Missing message_id field",
"timestamp": datetime.utcnow().isoformat()
})
continue
message = message_repository.mark_as_read(
db, message_id=message_id, user_id=user.id
)
if not message:
await websocket.send_json({
"type": "error",
"message": "Message not found or you're not the recipient",
"timestamp": datetime.utcnow().isoformat()
})
continue
# Notify user that message is marked as read
await websocket.send_json({
"type": "message_read",
"message_id": message_id,
"timestamp": datetime.utcnow().isoformat()
})
# Notify sender that message is read if they're connected
if message.sender_id in connected_users:
try:
await connected_users[message.sender_id].send_json({
"type": "message_read_by_recipient",
"message_id": message_id,
"read_by": user.id,
"timestamp": datetime.utcnow().isoformat()
})
except Exception:
# Handle errors in sending to a particular client
pass
elif message_data.get("type") == "typing":
conversation_id = message_data.get("conversation_id")
if not conversation_id:
await websocket.send_json({
"type": "error",
"message": "Missing conversation_id field",
"timestamp": datetime.utcnow().isoformat()
})
continue
# Check if conversation exists and user is a participant
conversation = conversation_repository.get(db, id=conversation_id)
if not conversation or user not in conversation.participants:
continue
# Notify other participants that user is typing
for participant in conversation.participants:
if participant.id != user.id and participant.id in connected_users:
try:
await connected_users[participant.id].send_json({
"type": "user_typing",
"user_id": user.id,
"conversation_id": conversation_id,
"timestamp": datetime.utcnow().isoformat()
})
except Exception:
# Handle errors in sending to a particular client
pass
except WebSocketDisconnect:
# Remove user from connected users
if user and user.id in connected_users:
del connected_users[user.id]
except Exception as e:
# Handle any other exceptions
if user and user.id in connected_users:
del connected_users[user.id]

1
app/core/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

26
app/core/config.py Normal file
View File

@ -0,0 +1,26 @@
from pathlib import Path
import secrets
from typing import List
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Chat Application Backend"
# Security
SECRET_KEY: str = secrets.token_urlsafe(32)
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 8 days
# CORS
CORS_ORIGINS: List[str] = ["*"]
# Database
DB_DIR = Path("/app/storage/db")
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
class Config:
case_sensitive = True
settings = Settings()

29
app/core/security.py Normal file
View File

@ -0,0 +1,29 @@
from datetime import datetime, timedelta
from typing import Any, Union, Optional
import uuid
from passlib.context import CryptContext
from jose import jwt
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(subject: Union[str, Any], expires_delta: Optional[timedelta] = None) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256")
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def generate_uuid() -> str:
return str(uuid.uuid4())

1
app/db/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

View File

@ -0,0 +1,57 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.session import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType, **kwargs) -> ModelType:
obj_in_data = obj_in.dict()
for key, value in kwargs.items():
obj_in_data[key] = value
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: ModelType, obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = db_obj.__dict__
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: Any) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

View File

@ -0,0 +1,74 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.core.security import generate_uuid
from app.db.repositories.base import BaseRepository
from app.models.conversation import Conversation, conversation_participants
from app.models.user import User
from app.schemas.conversation import ConversationCreate, ConversationUpdate
class ConversationRepository(BaseRepository[Conversation, ConversationCreate, ConversationUpdate]):
def create_with_participants(
self, db: Session, *, obj_in: ConversationCreate, creator_id: str
) -> Conversation:
conversation_id = generate_uuid()
# Create conversation
db_obj = Conversation(
id=conversation_id,
name=obj_in.name,
is_group=obj_in.is_group
)
db.add(db_obj)
db.flush()
# Add creator to participants if not already included
participant_ids = set(obj_in.participant_ids)
if creator_id not in participant_ids:
participant_ids.add(creator_id)
# Add participants
for user_id in participant_ids:
user = db.query(User).filter(User.id == user_id).first()
if user:
db_obj.participants.append(user)
db.commit()
db.refresh(db_obj)
return db_obj
def get_user_conversations(
self, db: Session, *, user_id: str, skip: int = 0, limit: int = 100
) -> List[Conversation]:
return (
db.query(Conversation)
.join(conversation_participants)
.filter(conversation_participants.c.user_id == user_id)
.offset(skip)
.limit(limit)
.all()
)
def get_conversation_between_users(
self, db: Session, *, user_id_1: str, user_id_2: str
) -> Optional[Conversation]:
# Find direct conversations (not groups) where both users are participants
conversations = (
db.query(Conversation)
.filter(Conversation.is_group == False)
.join(conversation_participants, Conversation.id == conversation_participants.c.conversation_id)
.filter(conversation_participants.c.user_id.in_([user_id_1, user_id_2]))
.all()
)
# Check which conversations have exactly these two users
for conversation in conversations:
participant_ids = [participant.id for participant in conversation.participants]
if set(participant_ids) == {user_id_1, user_id_2}:
return conversation
return None
conversation_repository = ConversationRepository(Conversation)

View File

@ -0,0 +1,70 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.core.security import generate_uuid
from app.db.repositories.base import BaseRepository
from app.models.message import Message
from app.schemas.message import MessageCreate, MessageUpdate
class MessageRepository(BaseRepository[Message, MessageCreate, MessageUpdate]):
def create_with_sender(
self, db: Session, *, obj_in: MessageCreate, sender_id: str
) -> Message:
message_id = generate_uuid()
db_obj = Message(
id=message_id,
content=obj_in.content,
sender_id=sender_id,
recipient_id=obj_in.recipient_id,
conversation_id=obj_in.conversation_id,
is_read=False
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_conversation_messages(
self, db: Session, *, conversation_id: str, skip: int = 0, limit: int = 100
) -> List[Message]:
return (
db.query(Message)
.filter(Message.conversation_id == conversation_id)
.order_by(Message.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
def mark_as_read(
self, db: Session, *, message_id: str, user_id: str
) -> Optional[Message]:
message = (
db.query(Message)
.filter(Message.id == message_id)
.filter(Message.recipient_id == user_id)
.first()
)
if message and not message.is_read:
message.is_read = True
db.add(message)
db.commit()
db.refresh(message)
return message
def get_unread_count(
self, db: Session, *, user_id: str
) -> int:
return (
db.query(Message)
.filter(Message.recipient_id == user_id)
.filter(Message.is_read == False)
.count()
)
message_repository = MessageRepository(Message)

View File

@ -0,0 +1,45 @@
from typing import Optional
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password, generate_uuid
from app.db.repositories.base import BaseRepository
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class UserRepository(BaseRepository[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def get_by_username(self, db: Session, *, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
user_id = generate_uuid()
hashed_password = get_password_hash(obj_in.password)
db_obj = User(
id=user_id,
username=obj_in.username,
email=obj_in.email,
hashed_password=hashed_password,
is_active=obj_in.is_active
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
user_repository = UserRepository(User)

21
app/db/session.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

3
app/models/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from app.models.user import User
from app.models.conversation import Conversation, conversation_participants
from app.models.message import Message

View File

@ -0,0 +1,25 @@
from sqlalchemy import Column, String, DateTime, func, ForeignKey, Table
from sqlalchemy.orm import relationship
from app.db.session import Base
# Association table for many-to-many relationship between users and conversations
conversation_participants = Table(
"conversation_participants",
Base.metadata,
Column("conversation_id", String, ForeignKey("conversations.id"), primary_key=True),
Column("user_id", String, ForeignKey("users.id"), primary_key=True)
)
class Conversation(Base):
__tablename__ = "conversations"
id = Column(String, primary_key=True, index=True)
name = Column(String, nullable=True) # Optional name for group chats
is_group = Column(String, default=False)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
# Relationships
participants = relationship("User", secondary=conversation_participants, back_populates="conversations")
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")

21
app/models/message.py Normal file
View File

@ -0,0 +1,21 @@
from sqlalchemy import Column, String, DateTime, func, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from app.db.session import Base
class Message(Base):
__tablename__ = "messages"
id = Column(String, primary_key=True, index=True)
content = Column(String)
sender_id = Column(String, ForeignKey("users.id"))
recipient_id = Column(String, ForeignKey("users.id"), nullable=True) # For direct messages
conversation_id = Column(String, ForeignKey("conversations.id"))
is_read = Column(Boolean, default=False)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
# Relationships
sender = relationship("User", foreign_keys=[sender_id], back_populates="messages_sent")
recipient = relationship("User", foreign_keys=[recipient_id], back_populates="messages_received")
conversation = relationship("Conversation", back_populates="messages")

20
app/models/user.py Normal file
View File

@ -0,0 +1,20 @@
from sqlalchemy import Boolean, Column, String, DateTime, func
from sqlalchemy.orm import relationship
from app.db.session import Base
class User(Base):
__tablename__ = "users"
id = Column(String, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
# Relationships
messages_sent = relationship("Message", back_populates="sender", foreign_keys="Message.sender_id")
messages_received = relationship("Message", back_populates="recipient", foreign_keys="Message.recipient_id")
conversations = relationship("Conversation", secondary="conversation_participants", back_populates="participants")

3
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from app.schemas.user import User, UserCreate, UserUpdate, UserInDB
from app.schemas.message import Message, MessageCreate, MessageUpdate, MessageInDB
from app.schemas.conversation import Conversation, ConversationCreate, ConversationUpdate, ConversationInDB

View File

@ -0,0 +1,30 @@
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel
from app.schemas.user import User
class ConversationBase(BaseModel):
name: Optional[str] = None
is_group: bool = False
class ConversationCreate(ConversationBase):
participant_ids: List[str]
class ConversationUpdate(BaseModel):
name: Optional[str] = None
class ConversationInDBBase(ConversationBase):
id: str
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class Conversation(ConversationInDBBase):
participants: List[User]
class ConversationInDB(ConversationInDBBase):
pass

32
app/schemas/message.py Normal file
View File

@ -0,0 +1,32 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class MessageBase(BaseModel):
content: str
recipient_id: Optional[str] = None # Optional for group chats
class MessageCreate(MessageBase):
conversation_id: str
class MessageUpdate(BaseModel):
content: Optional[str] = None
is_read: Optional[bool] = None
class MessageInDBBase(MessageBase):
id: str
sender_id: str
conversation_id: str
is_read: bool
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class Message(MessageInDBBase):
pass
class MessageInDB(MessageInDBBase):
pass

10
app/schemas/token.py Normal file
View File

@ -0,0 +1,10 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[str] = None

32
app/schemas/user.py Normal file
View File

@ -0,0 +1,32 @@
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field
class UserBase(BaseModel):
username: str
email: EmailStr
is_active: Optional[bool] = True
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
is_active: Optional[bool] = None
class UserInDBBase(UserBase):
id: str
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class User(UserInDBBase):
pass
class UserInDB(UserInDBBase):
hashed_password: str

1
app/services/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

1
app/utils/__init__.py Normal file
View File

@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

33
main.py Normal file
View File

@ -0,0 +1,33 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.endpoints import users, auth, messages, health, websocket
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description="Chat Application Backend API",
version="0.1.0",
openapi_url=f"{settings.API_V1_STR}/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Set up CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(health.router)
app.include_router(auth.router, prefix=settings.API_V1_STR)
app.include_router(users.router, prefix=settings.API_V1_STR)
app.include_router(messages.router, prefix=settings.API_V1_STR)
app.include_router(websocket.router, prefix=settings.API_V1_STR)
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
fastapi>=0.95.0
uvicorn>=0.21.1
pydantic>=1.10.7
sqlalchemy>=2.0.9
alembic>=1.10.3
python-jose>=3.3.0
passlib>=1.7.4
bcrypt>=4.0.1
python-multipart>=0.0.6
pydantic-settings>=2.0.2
websockets>=11.0.3