Implement healthcare management system backend with FastAPI and SQLite

This commit is contained in:
Automated Action 2025-06-17 09:18:26 +00:00
parent 1ee0890ca7
commit f91bd18d96
49 changed files with 2241 additions and 2 deletions

189
README.md
View File

@ -1,3 +1,188 @@
# FastAPI Application
# Healthcare Management System Backend
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A comprehensive backend API for a healthcare management system built with FastAPI and SQLite.
## Features
- User Authentication and Authorization
- Patient Management
- Doctor Management
- Appointment Scheduling
- Medical Records Management
- Doctor Availability Scheduling
## Tech Stack
- **Framework**: FastAPI
- **Database**: SQLite with SQLAlchemy ORM
- **Migrations**: Alembic
- **Authentication**: JWT Tokens
- **Password Hashing**: BCrypt
- **Linting**: Ruff
## Project Structure
```
├── alembic.ini # Alembic configuration
├── main.py # Application entry point
├── app/ # Main application package
│ ├── api/ # API endpoints
│ │ ├── api_v1/ # API version 1
│ │ │ ├── endpoints/ # API endpoint modules
│ │ │ │ ├── login.py # Authentication endpoints
│ │ │ │ ├── users.py # User management endpoints
│ │ │ │ ├── patients.py # Patient management endpoints
│ │ │ │ ├── doctors.py # Doctor management endpoints
│ │ │ │ └── appointments.py # Appointment management endpoints
│ │ │ └── api.py # API router
│ │ └── deps.py # Dependency injection
│ ├── core/ # Core application modules
│ │ ├── config.py # Application configuration
│ │ └── security.py # Security utilities
│ ├── crud/ # CRUD operations
│ │ ├── base.py # Base CRUD class
│ │ ├── crud_user.py # User CRUD operations
│ │ ├── crud_patient.py # Patient CRUD operations
│ │ ├── crud_doctor.py # Doctor CRUD operations
│ │ └── crud_appointment.py # Appointment CRUD operations
│ ├── db/ # Database setup
│ │ ├── base.py # Base class for models
│ │ ├── base_class.py # Imports all models for Alembic
│ │ └── session.py # Database session setup
│ ├── models/ # SQLAlchemy models
│ │ ├── user.py # User model
│ │ ├── patient.py # Patient model
│ │ ├── doctor.py # Doctor model
│ │ ├── appointment.py # Appointment model
│ │ ├── medical_record.py # Medical record model
│ │ └── doctor_schedule.py # Doctor schedule model
│ └── schemas/ # Pydantic models (schemas)
│ ├── user.py # User schemas
│ ├── patient.py # Patient schemas
│ ├── doctor.py # Doctor schemas
│ ├── appointment.py # Appointment schemas
│ ├── medical_record.py # Medical record schemas
│ ├── doctor_schedule.py # Doctor schedule schemas
│ └── token.py # Token schemas
└── migrations/ # Alembic migrations
├── versions/ # Migration versions
│ └── 001_initial_migration.py # Initial migration
├── env.py # Alembic environment
├── README # Alembic README
└── script.py.mako # Alembic script template
```
## Environment Variables
The application uses the following environment variables:
| Name | Description | Default Value |
|------|-------------|---------------|
| SECRET_KEY | Secret key for JWT token generation and verification | development_secret_key |
| ACCESS_TOKEN_EXPIRE_MINUTES | JWT token expiration time in minutes | 60 |
| JWT_SECRET | Secret key for JWT token generation | Same as SECRET_KEY |
## Getting Started
### Prerequisites
- Python 3.8+
- pip (Python package installer)
### Installation
1. Clone the repository:
```bash
git clone https://github.com/yourusername/healthcare-management-system-backend.git
cd healthcare-management-system-backend
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Initialize the database:
```bash
# Create the database directory
mkdir -p /app/storage/db
# Run the migrations
alembic upgrade head
```
4. Start the application:
```bash
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
```
## API Documentation
Once the application is running, you can access the interactive API documentation at:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
## API Endpoints
### Authentication
- **POST /api/v1/login/access-token**: Get access token (OAuth2 compatible)
### Users
- **GET /api/v1/users/**: Get all users (admin only)
- **POST /api/v1/users/**: Create a new user (admin only)
- **GET /api/v1/users/me**: Get current user information
- **PUT /api/v1/users/me**: Update current user information
- **GET /api/v1/users/{user_id}**: Get user by ID
- **PUT /api/v1/users/{user_id}**: Update user by ID (admin only)
### Patients
- **GET /api/v1/patients/**: Get all patients
- **POST /api/v1/patients/**: Create a new patient
- **GET /api/v1/patients/{patient_id}**: Get patient by ID
- **PUT /api/v1/patients/{patient_id}**: Update patient by ID
- **DELETE /api/v1/patients/{patient_id}**: Delete patient by ID (admin only)
### Doctors
- **GET /api/v1/doctors/**: Get all doctors
- **POST /api/v1/doctors/**: Create a new doctor
- **GET /api/v1/doctors/{doctor_id}**: Get doctor by ID
- **PUT /api/v1/doctors/{doctor_id}**: Update doctor by ID
- **DELETE /api/v1/doctors/{doctor_id}**: Delete doctor by ID (admin only)
### Appointments
- **GET /api/v1/appointments/**: Get all appointments
- **POST /api/v1/appointments/**: Create a new appointment
- **GET /api/v1/appointments/{appointment_id}**: Get appointment by ID
- **PUT /api/v1/appointments/{appointment_id}**: Update appointment by ID
- **DELETE /api/v1/appointments/{appointment_id}**: Delete appointment by ID
- **POST /api/v1/appointments/{appointment_id}/status**: Update appointment status
## Development
### Running Tests
```bash
pytest
```
### Linting
```bash
ruff check .
```
### Autofixing Linting Issues
```bash
ruff check --fix .
```
## License
This project is licensed under the MIT License - see the LICENSE file for details.

105
alembic.ini Normal file
View File

@ -0,0 +1,105 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

View File

10
app/api/api_v1/api.py Normal file
View File

@ -0,0 +1,10 @@
from fastapi import APIRouter
from app.api.api_v1.endpoints import users, patients, doctors, appointments, login
api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(patients.router, prefix="/patients", tags=["patients"])
api_router.include_router(doctors.router, prefix="/doctors", tags=["doctors"])
api_router.include_router(appointments.router, prefix="/appointments", tags=["appointments"])

View File

View File

@ -0,0 +1,291 @@
from datetime import timedelta
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.models.appointment import AppointmentStatus
router = APIRouter()
@router.get("/", response_model=List[schemas.appointment.Appointment])
def read_appointments(
db: Session = Depends(deps.get_db),
doctor_id: Optional[int] = Query(None, description="Filter appointments by doctor"),
patient_id: Optional[int] = Query(None, description="Filter appointments by patient"),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve appointments.
"""
# Determine what appointments the user can see
if crud.crud_user.user.is_superuser(current_user):
# Superusers can see all appointments with optional filtering
if doctor_id:
appointments = crud.crud_appointment.appointment.get_by_doctor(
db, doctor_id=doctor_id, skip=skip, limit=limit
)
elif patient_id:
appointments = crud.crud_appointment.appointment.get_by_patient(
db, patient_id=patient_id, skip=skip, limit=limit
)
else:
appointments = crud.crud_appointment.appointment.get_multi(
db, skip=skip, limit=limit
)
else:
# Regular users can only see their own appointments
if current_user.doctor:
# Doctors can see all their appointments
appointments = crud.crud_appointment.appointment.get_by_doctor(
db, doctor_id=current_user.doctor.id, skip=skip, limit=limit
)
elif current_user.patient:
# Patients can see all their appointments
appointments = crud.crud_appointment.appointment.get_by_patient(
db, patient_id=current_user.patient.id, skip=skip, limit=limit
)
else:
# Users without doctor or patient profiles can't see any appointments
appointments = []
return appointments
@router.post("/", response_model=schemas.appointment.Appointment)
def create_appointment(
*,
db: Session = Depends(deps.get_db),
appointment_in: schemas.appointment.AppointmentCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new appointment.
"""
# Check if doctor exists
doctor = crud.crud_doctor.doctor.get(db, id=appointment_in.doctor_id)
if not doctor:
raise HTTPException(
status_code=404,
detail="Doctor not found",
)
# Check if patient exists
patient = crud.crud_patient.patient.get(db, id=appointment_in.patient_id)
if not patient:
raise HTTPException(
status_code=404,
detail="Patient not found",
)
# Check permissions
if not crud.crud_user.user.is_superuser(current_user):
# Only the patient or the doctor involved can create an appointment
if current_user.patient:
if current_user.patient.id != appointment_in.patient_id:
raise HTTPException(
status_code=403,
detail="Not enough permissions to create an appointment for another patient",
)
elif current_user.doctor:
if current_user.doctor.id != appointment_in.doctor_id:
raise HTTPException(
status_code=403,
detail="Not enough permissions to create an appointment for another doctor",
)
else:
raise HTTPException(
status_code=403,
detail="Not enough permissions to create an appointment",
)
# Check for scheduling conflicts
appointment_end = appointment_in.appointment_datetime + timedelta(minutes=appointment_in.duration_minutes)
doctor_appointments = crud.crud_appointment.appointment.get_by_doctor_and_date_range(
db,
doctor_id=appointment_in.doctor_id,
start_date=appointment_in.appointment_datetime - timedelta(hours=1), # Buffer before
end_date=appointment_end + timedelta(hours=1), # Buffer after
)
for existing_appointment in doctor_appointments:
# Skip cancelled appointments
if existing_appointment.status == AppointmentStatus.CANCELLED:
continue
existing_end = existing_appointment.appointment_datetime + timedelta(minutes=existing_appointment.duration_minutes)
# Check for overlap
if (appointment_in.appointment_datetime < existing_end and
existing_appointment.appointment_datetime < appointment_end):
raise HTTPException(
status_code=400,
detail=f"Scheduling conflict with an existing appointment at {existing_appointment.appointment_datetime}",
)
appointment = crud.crud_appointment.appointment.create(db, obj_in=appointment_in)
return appointment
@router.get("/{appointment_id}", response_model=schemas.appointment.Appointment)
def read_appointment(
*,
db: Session = Depends(deps.get_db),
appointment_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get appointment by ID.
"""
appointment = crud.crud_appointment.appointment.get(db, id=appointment_id)
if not appointment:
raise HTTPException(status_code=404, detail="Appointment not found")
# Check permissions
if not crud.crud_user.user.is_superuser(current_user):
# Regular users can only see their own appointments
if current_user.patient:
if current_user.patient.id != appointment.patient_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
elif current_user.doctor:
if current_user.doctor.id != appointment.doctor_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
else:
raise HTTPException(status_code=403, detail="Not enough permissions")
return appointment
@router.put("/{appointment_id}", response_model=schemas.appointment.Appointment)
def update_appointment(
*,
db: Session = Depends(deps.get_db),
appointment_id: int,
appointment_in: schemas.appointment.AppointmentUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update an appointment.
"""
appointment = crud.crud_appointment.appointment.get(db, id=appointment_id)
if not appointment:
raise HTTPException(status_code=404, detail="Appointment not found")
# Check permissions
if not crud.crud_user.user.is_superuser(current_user):
# Regular users can only update their own appointments
if current_user.patient:
if current_user.patient.id != appointment.patient_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
elif current_user.doctor:
if current_user.doctor.id != appointment.doctor_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
else:
raise HTTPException(status_code=403, detail="Not enough permissions")
# If appointment_datetime is updated, check for scheduling conflicts
if appointment_in.appointment_datetime and appointment_in.appointment_datetime != appointment.appointment_datetime:
appointment_end = appointment_in.appointment_datetime + timedelta(
minutes=appointment_in.duration_minutes if appointment_in.duration_minutes else appointment.duration_minutes
)
doctor_appointments = crud.crud_appointment.appointment.get_by_doctor_and_date_range(
db,
doctor_id=appointment.doctor_id,
start_date=appointment_in.appointment_datetime - timedelta(hours=1),
end_date=appointment_end + timedelta(hours=1),
)
for existing_appointment in doctor_appointments:
# Skip the current appointment and cancelled appointments
if existing_appointment.id == appointment_id or existing_appointment.status == AppointmentStatus.CANCELLED:
continue
existing_end = existing_appointment.appointment_datetime + timedelta(minutes=existing_appointment.duration_minutes)
# Check for overlap
if (appointment_in.appointment_datetime < existing_end and
existing_appointment.appointment_datetime < appointment_end):
raise HTTPException(
status_code=400,
detail=f"Scheduling conflict with an existing appointment at {existing_appointment.appointment_datetime}",
)
appointment = crud.crud_appointment.appointment.update(
db, db_obj=appointment, obj_in=appointment_in
)
return appointment
@router.delete("/{appointment_id}", response_model=schemas.appointment.Appointment)
def delete_appointment(
*,
db: Session = Depends(deps.get_db),
appointment_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete an appointment.
"""
appointment = crud.crud_appointment.appointment.get(db, id=appointment_id)
if not appointment:
raise HTTPException(status_code=404, detail="Appointment not found")
# Check permissions
if not crud.crud_user.user.is_superuser(current_user):
# Regular users can only delete their own appointments
if current_user.patient:
if current_user.patient.id != appointment.patient_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
elif current_user.doctor:
if current_user.doctor.id != appointment.doctor_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
else:
raise HTTPException(status_code=403, detail="Not enough permissions")
appointment = crud.crud_appointment.appointment.remove(db, id=appointment_id)
return appointment
@router.post("/{appointment_id}/status", response_model=schemas.appointment.Appointment)
def update_appointment_status(
*,
db: Session = Depends(deps.get_db),
appointment_id: int,
status: AppointmentStatus,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update appointment status.
"""
appointment = crud.crud_appointment.appointment.get(db, id=appointment_id)
if not appointment:
raise HTTPException(status_code=404, detail="Appointment not found")
# Check permissions
if not crud.crud_user.user.is_superuser(current_user):
# Regular users can only update their own appointments
if current_user.patient:
if current_user.patient.id != appointment.patient_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
# Patients can only cancel their appointments
if status not in [AppointmentStatus.CANCELLED]:
raise HTTPException(
status_code=403,
detail="Patients can only cancel appointments"
)
elif current_user.doctor:
if current_user.doctor.id != appointment.doctor_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
else:
raise HTTPException(status_code=403, detail="Not enough permissions")
appointment = crud.crud_appointment.appointment.update_status(
db, db_obj=appointment, status=status
)
return appointment

View File

@ -0,0 +1,126 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.doctor.Doctor])
def read_doctors(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
specialty: Optional[str] = Query(None, description="Filter doctors by specialty"),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve doctors.
"""
if specialty:
doctors = crud.crud_doctor.doctor.get_multi_by_specialty(
db, specialty=specialty, skip=skip, limit=limit
)
else:
doctors = crud.crud_doctor.doctor.get_multi(db, skip=skip, limit=limit)
return doctors
@router.post("/", response_model=schemas.doctor.Doctor)
def create_doctor(
*,
db: Session = Depends(deps.get_db),
doctor_in: schemas.doctor.DoctorCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new doctor.
"""
# Check if user exists
user = crud.crud_user.user.get(db, id=doctor_in.user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this id does not exist in the system",
)
# Check if doctor already exists for this user
doctor = crud.crud_doctor.doctor.get_by_user_id(db, user_id=doctor_in.user_id)
if doctor:
raise HTTPException(
status_code=400,
detail="The doctor with this user_id already exists in the system",
)
# Check if license number is already registered
doctor = crud.crud_doctor.doctor.get_by_license_number(db, license_number=doctor_in.license_number)
if doctor:
raise HTTPException(
status_code=400,
detail="The doctor with this license_number already exists in the system",
)
doctor = crud.crud_doctor.doctor.create(db, obj_in=doctor_in)
return doctor
@router.get("/{doctor_id}", response_model=schemas.doctor.Doctor)
def read_doctor(
*,
db: Session = Depends(deps.get_db),
doctor_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get doctor by ID.
"""
doctor = crud.crud_doctor.doctor.get(db, id=doctor_id)
if not doctor:
raise HTTPException(status_code=404, detail="Doctor not found")
return doctor
@router.put("/{doctor_id}", response_model=schemas.doctor.Doctor)
def update_doctor(
*,
db: Session = Depends(deps.get_db),
doctor_id: int,
doctor_in: schemas.doctor.DoctorUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a doctor.
"""
doctor = crud.crud_doctor.doctor.get(db, id=doctor_id)
if not doctor:
raise HTTPException(status_code=404, detail="Doctor not found")
# Check permissions
if not crud.crud_user.user.is_superuser(current_user):
# Regular users can only update their own doctor data
if not current_user.doctor or current_user.doctor.id != doctor_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
doctor = crud.crud_doctor.doctor.update(db, db_obj=doctor, obj_in=doctor_in)
return doctor
@router.delete("/{doctor_id}", response_model=schemas.doctor.Doctor)
def delete_doctor(
*,
db: Session = Depends(deps.get_db),
doctor_id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a doctor.
"""
doctor = crud.crud_doctor.doctor.get(db, id=doctor_id)
if not doctor:
raise HTTPException(status_code=404, detail="Doctor not found")
doctor = crud.crud_doctor.doctor.remove(db, id=doctor_id)
return doctor

View File

@ -0,0 +1,41 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
router = APIRouter()
@router.post("/login/access-token", response_model=schemas.token.Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.crud_user.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
elif not crud.crud_user.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}

View File

@ -0,0 +1,138 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.patient.Patient])
def read_patients(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve patients.
"""
if crud.crud_user.user.is_superuser(current_user):
patients = crud.crud_patient.patient.get_multi(db, skip=skip, limit=limit)
else:
# Regular users can only access their own patient data
if current_user.patient:
patients = [current_user.patient]
else:
# If the user is a doctor, we could potentially show all their patients
if current_user.doctor:
# This would require a more complex query to get all patients that have appointments with this doctor
# For simplicity, we'll just return an empty list for now
patients = []
else:
patients = []
return patients
@router.post("/", response_model=schemas.patient.Patient)
def create_patient(
*,
db: Session = Depends(deps.get_db),
patient_in: schemas.patient.PatientCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new patient.
"""
# Check if user exists
user = crud.crud_user.user.get(db, id=patient_in.user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this id does not exist in the system",
)
# Check if patient already exists for this user
patient = crud.crud_patient.patient.get_by_user_id(db, user_id=patient_in.user_id)
if patient:
raise HTTPException(
status_code=400,
detail="The patient with this user_id already exists in the system",
)
patient = crud.crud_patient.patient.create(db, obj_in=patient_in)
return patient
@router.get("/{patient_id}", response_model=schemas.patient.Patient)
def read_patient(
*,
db: Session = Depends(deps.get_db),
patient_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get patient by ID.
"""
patient = crud.crud_patient.patient.get(db, id=patient_id)
if not patient:
raise HTTPException(status_code=404, detail="Patient not found")
# Check permissions
if not crud.crud_user.user.is_superuser(current_user):
# Regular users can only access their own patient data
if not current_user.patient or current_user.patient.id != patient_id:
# If the user is a doctor, we could potentially allow them to see their patients
if current_user.doctor:
# For simplicity, we'll allow doctors to see all patients for now
# In a real application, you would check if the patient has an appointment with this doctor
pass
else:
raise HTTPException(status_code=403, detail="Not enough permissions")
return patient
@router.put("/{patient_id}", response_model=schemas.patient.Patient)
def update_patient(
*,
db: Session = Depends(deps.get_db),
patient_id: int,
patient_in: schemas.patient.PatientUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a patient.
"""
patient = crud.crud_patient.patient.get(db, id=patient_id)
if not patient:
raise HTTPException(status_code=404, detail="Patient not found")
# Check permissions
if not crud.crud_user.user.is_superuser(current_user):
# Regular users can only update their own patient data
if not current_user.patient or current_user.patient.id != patient_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
patient = crud.crud_patient.patient.update(db, db_obj=patient, obj_in=patient_in)
return patient
@router.delete("/{patient_id}", response_model=schemas.patient.Patient)
def delete_patient(
*,
db: Session = Depends(deps.get_db),
patient_id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a patient.
"""
patient = crud.crud_patient.patient.get(db, id=patient_id)
if not patient:
raise HTTPException(status_code=404, detail="Patient not found")
patient = crud.crud_patient.patient.remove(db, id=patient_id)
return patient

View File

@ -0,0 +1,120 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.user.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.crud_user.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=schemas.user.User)
def create_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.user.UserCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
user = crud.crud_user.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system.",
)
user = crud.crud_user.user.create(db, obj_in=user_in)
return user
@router.put("/me", response_model=schemas.user.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.user.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.crud_user.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.user.User)
def read_user_me(
db: Session = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.get("/{user_id}", response_model=schemas.user.User)
def read_user_by_id(
user_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.crud_user.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.crud_user.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=schemas.user.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: int,
user_in: schemas.user.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.crud_user.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
user = crud.crud_user.user.update(db, db_obj=user, obj_in=user_in)
return user

49
app/api/deps.py Normal file
View File

@ -0,0 +1,49 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import models, schemas
from app.core.config import settings
from app.db.session import get_db
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
try:
payload = jwt.decode(
token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
)
token_data = schemas.token.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = db.query(models.User).filter(models.User.id == token_data.sub).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_superuser:
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

0
app/core/__init__.py Normal file
View File

44
app/core/config.py Normal file
View File

@ -0,0 +1,44 @@
import os
from typing import List, Union
from pydantic import AnyHttpUrl, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
SECRET_KEY: str = os.environ.get("SECRET_KEY", "development_secret_key")
ACCESS_TOKEN_EXPIRE_MINUTES: int = int(os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", "60")) # 1 hour
# Project info
PROJECT_NAME: str = "Healthcare Management System"
PROJECT_DESCRIPTION: str = "Backend API for Healthcare Management System"
VERSION: str = "0.1.0"
# CORS
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
@field_validator("BACKEND_CORS_ORIGINS", mode="before")
def assemble_cors_origins(cls, v: Union[str, List[str]]) -> Union[List[str], str]:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, (list, str)):
return v
raise ValueError(v)
# JWT
JWT_SECRET: str = os.environ.get("JWT_SECRET", SECRET_KEY)
JWT_ALGORITHM: str = "HS256"
# Database
DB_DIR: str = "/app/storage/db"
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
# SQLAlchemy config
SQLALCHEMY_TRACK_MODIFICATIONS: bool = False
SQLALCHEMY_ECHO: bool = False
model_config = SettingsConfigDict(case_sensitive=True)
settings = Settings()

31
app/core/security.py Normal file
View File

@ -0,0 +1,31 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

0
app/crud/__init__.py Normal file
View File

66
app/crud/base.py Normal file
View File

@ -0,0 +1,66 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

View File

@ -0,0 +1,65 @@
from datetime import datetime
from typing import List
from sqlalchemy import and_
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.appointment import Appointment, AppointmentStatus
from app.schemas.appointment import AppointmentCreate, AppointmentUpdate
class CRUDAppointment(CRUDBase[Appointment, AppointmentCreate, AppointmentUpdate]):
def get_by_doctor_and_date_range(
self,
db: Session,
*,
doctor_id: int,
start_date: datetime,
end_date: datetime
) -> List[Appointment]:
return (
db.query(Appointment)
.filter(
and_(
Appointment.doctor_id == doctor_id,
Appointment.appointment_datetime >= start_date,
Appointment.appointment_datetime <= end_date
)
)
.all()
)
def get_by_patient(
self, db: Session, *, patient_id: int, skip: int = 0, limit: int = 100
) -> List[Appointment]:
return (
db.query(Appointment)
.filter(Appointment.patient_id == patient_id)
.offset(skip)
.limit(limit)
.all()
)
def get_by_doctor(
self, db: Session, *, doctor_id: int, skip: int = 0, limit: int = 100
) -> List[Appointment]:
return (
db.query(Appointment)
.filter(Appointment.doctor_id == doctor_id)
.offset(skip)
.limit(limit)
.all()
)
def update_status(
self, db: Session, *, db_obj: Appointment, status: AppointmentStatus
) -> Appointment:
db_obj.status = status
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
appointment = CRUDAppointment(Appointment)

29
app/crud/crud_doctor.py Normal file
View File

@ -0,0 +1,29 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.doctor import Doctor
from app.schemas.doctor import DoctorCreate, DoctorUpdate
class CRUDDoctor(CRUDBase[Doctor, DoctorCreate, DoctorUpdate]):
def get_by_user_id(self, db: Session, *, user_id: int) -> Optional[Doctor]:
return db.query(Doctor).filter(Doctor.user_id == user_id).first()
def get_by_license_number(self, db: Session, *, license_number: str) -> Optional[Doctor]:
return db.query(Doctor).filter(Doctor.license_number == license_number).first()
def get_multi_by_specialty(
self, db: Session, *, specialty: str, skip: int = 0, limit: int = 100
) -> List[Doctor]:
return (
db.query(Doctor)
.filter(Doctor.specialty == specialty)
.offset(skip)
.limit(limit)
.all()
)
doctor = CRUDDoctor(Doctor)

View File

@ -0,0 +1,43 @@
from typing import List
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.doctor_schedule import DoctorSchedule, WeekDay
from app.schemas.doctor_schedule import DoctorScheduleCreate, DoctorScheduleUpdate
class CRUDDoctorSchedule(CRUDBase[DoctorSchedule, DoctorScheduleCreate, DoctorScheduleUpdate]):
def get_by_doctor(
self, db: Session, *, doctor_id: int, skip: int = 0, limit: int = 100
) -> List[DoctorSchedule]:
return (
db.query(DoctorSchedule)
.filter(DoctorSchedule.doctor_id == doctor_id)
.offset(skip)
.limit(limit)
.all()
)
def get_by_doctor_and_day(
self, db: Session, *, doctor_id: int, day: WeekDay
) -> List[DoctorSchedule]:
return (
db.query(DoctorSchedule)
.filter(DoctorSchedule.doctor_id == doctor_id, DoctorSchedule.day_of_week == day)
.all()
)
def get_available_doctors_by_day(
self, db: Session, *, day: WeekDay, skip: int = 0, limit: int = 100
) -> List[DoctorSchedule]:
return (
db.query(DoctorSchedule)
.filter(DoctorSchedule.day_of_week == day, DoctorSchedule.is_available)
.offset(skip)
.limit(limit)
.all()
)
doctor_schedule = CRUDDoctorSchedule(DoctorSchedule)

View File

@ -0,0 +1,34 @@
from typing import List
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.medical_record import MedicalRecord
from app.schemas.medical_record import MedicalRecordCreate, MedicalRecordUpdate
class CRUDMedicalRecord(CRUDBase[MedicalRecord, MedicalRecordCreate, MedicalRecordUpdate]):
def get_by_patient(
self, db: Session, *, patient_id: int, skip: int = 0, limit: int = 100
) -> List[MedicalRecord]:
return (
db.query(MedicalRecord)
.filter(MedicalRecord.patient_id == patient_id)
.offset(skip)
.limit(limit)
.all()
)
def get_by_doctor(
self, db: Session, *, doctor_id: int, skip: int = 0, limit: int = 100
) -> List[MedicalRecord]:
return (
db.query(MedicalRecord)
.filter(MedicalRecord.doctor_id == doctor_id)
.offset(skip)
.limit(limit)
.all()
)
medical_record = CRUDMedicalRecord(MedicalRecord)

15
app/crud/crud_patient.py Normal file
View File

@ -0,0 +1,15 @@
from typing import Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.patient import Patient
from app.schemas.patient import PatientCreate, PatientUpdate
class CRUDPatient(CRUDBase[Patient, PatientCreate, PatientUpdate]):
def get_by_user_id(self, db: Session, *, user_id: int) -> Optional[Patient]:
return db.query(Patient).filter(Patient.user_id == user_id).first()
patient = CRUDPatient(Patient)

56
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,56 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
is_active=obj_in.is_active,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

0
app/db/__init__.py Normal file
View File

3
app/db/base.py Normal file
View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

6
app/db/base_class.py Normal file
View File

@ -0,0 +1,6 @@
from app.db.base import Base
# Import all models here for Alembic autogenerate
# Base imported for convenience and usability in other modules
__all__ = ["Base"]

28
app/db/session.py Normal file
View File

@ -0,0 +1,28 @@
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
# Create DB directory if it doesn't exist
DB_DIR = Path(settings.DB_DIR)
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = settings.SQLALCHEMY_DATABASE_URL
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
"""
Dependency for getting DB session.
"""
db = SessionLocal()
try:
yield db
finally:
db.close()

0
app/models/__init__.py Normal file
View File

37
app/models/appointment.py Normal file
View File

@ -0,0 +1,37 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, Enum, ForeignKey, Integer, Text
from sqlalchemy.orm import relationship
import enum
from app.db.base import Base
class AppointmentStatus(str, enum.Enum):
SCHEDULED = "scheduled"
CONFIRMED = "confirmed"
COMPLETED = "completed"
CANCELLED = "cancelled"
NO_SHOW = "no_show"
class Appointment(Base):
__tablename__ = "appointments"
id = Column(Integer, primary_key=True, index=True)
patient_id = Column(Integer, ForeignKey("patients.id"), nullable=False)
doctor_id = Column(Integer, ForeignKey("doctors.id"), nullable=False)
appointment_datetime = Column(DateTime, nullable=False)
duration_minutes = Column(Integer, default=30, nullable=False)
status = Column(
Enum(AppointmentStatus),
default=AppointmentStatus.SCHEDULED,
nullable=False
)
reason = Column(Text, nullable=True)
notes = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
patient = relationship("Patient", back_populates="appointments")
doctor = relationship("Doctor", back_populates="appointments")

23
app/models/doctor.py Normal file
View File

@ -0,0 +1,23 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship
from app.db.base import Base
class Doctor(Base):
__tablename__ = "doctors"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), unique=True, nullable=False)
specialty = Column(String, nullable=False)
license_number = Column(String, unique=True, nullable=False)
experience_years = Column(Integer, nullable=True)
bio = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
user = relationship("User", back_populates="doctor")
appointments = relationship("Appointment", back_populates="doctor")
schedules = relationship("DoctorSchedule", back_populates="doctor")

View File

@ -0,0 +1,35 @@
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, Enum, ForeignKey, Integer, Time
from sqlalchemy.orm import relationship
import enum
from app.db.base import Base
class WeekDay(str, enum.Enum):
MONDAY = "monday"
TUESDAY = "tuesday"
WEDNESDAY = "wednesday"
THURSDAY = "thursday"
FRIDAY = "friday"
SATURDAY = "saturday"
SUNDAY = "sunday"
class DoctorSchedule(Base):
__tablename__ = "doctor_schedules"
id = Column(Integer, primary_key=True, index=True)
doctor_id = Column(Integer, ForeignKey("doctors.id"), nullable=False)
day_of_week = Column(Enum(WeekDay), nullable=False)
start_time = Column(Time, nullable=False)
end_time = Column(Time, nullable=False)
is_available = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
doctor = relationship("Doctor", back_populates="schedules")
class Config:
orm_mode = True

View File

@ -0,0 +1,23 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Text
from sqlalchemy.orm import relationship
from app.db.base import Base
class MedicalRecord(Base):
__tablename__ = "medical_records"
id = Column(Integer, primary_key=True, index=True)
patient_id = Column(Integer, ForeignKey("patients.id"), nullable=False)
doctor_id = Column(Integer, ForeignKey("doctors.id"), nullable=False)
diagnosis = Column(Text, nullable=True)
prescription = Column(Text, nullable=True)
notes = Column(Text, nullable=True)
visit_date = Column(DateTime, default=datetime.utcnow, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
patient = relationship("Patient", back_populates="medical_records")
doctor = relationship("Doctor")

25
app/models/patient.py Normal file
View File

@ -0,0 +1,25 @@
from datetime import datetime
from sqlalchemy import Column, Date, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship
from app.db.base import Base
class Patient(Base):
__tablename__ = "patients"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), unique=True, nullable=False)
date_of_birth = Column(Date, nullable=True)
blood_type = Column(String, nullable=True)
allergies = Column(Text, nullable=True)
medical_history = Column(Text, nullable=True)
emergency_contact_name = Column(String, nullable=True)
emergency_contact_phone = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
user = relationship("User", back_populates="patient")
appointments = relationship("Appointment", back_populates="patient")
medical_records = relationship("MedicalRecord", back_populates="patient")

22
app/models/user.py Normal file
View File

@ -0,0 +1,22 @@
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy.orm import relationship
from app.db.base import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
doctor = relationship("Doctor", back_populates="user", uselist=False)
patient = relationship("Patient", back_populates="user", uselist=False)

0
app/schemas/__init__.py Normal file
View File

View File

@ -0,0 +1,55 @@
from datetime import datetime
from typing import Optional
from enum import Enum
from pydantic import BaseModel
class AppointmentStatus(str, Enum):
SCHEDULED = "scheduled"
CONFIRMED = "confirmed"
COMPLETED = "completed"
CANCELLED = "cancelled"
NO_SHOW = "no_show"
# Shared properties
class AppointmentBase(BaseModel):
appointment_datetime: Optional[datetime] = None
duration_minutes: Optional[int] = 30
status: Optional[AppointmentStatus] = AppointmentStatus.SCHEDULED
reason: Optional[str] = None
notes: Optional[str] = None
# Properties to receive via API on creation
class AppointmentCreate(AppointmentBase):
patient_id: int
doctor_id: int
appointment_datetime: datetime
# Properties to receive via API on update
class AppointmentUpdate(AppointmentBase):
pass
class AppointmentInDBBase(AppointmentBase):
id: Optional[int] = None
patient_id: int
doctor_id: int
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class Appointment(AppointmentInDBBase):
pass
# Additional properties stored in DB
class AppointmentInDB(AppointmentInDBBase):
pass

44
app/schemas/doctor.py Normal file
View File

@ -0,0 +1,44 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
# Shared properties
class DoctorBase(BaseModel):
specialty: Optional[str] = None
license_number: Optional[str] = None
experience_years: Optional[int] = None
bio: Optional[str] = None
# Properties to receive via API on creation
class DoctorCreate(DoctorBase):
user_id: int
specialty: str
license_number: str
# Properties to receive via API on update
class DoctorUpdate(DoctorBase):
pass
class DoctorInDBBase(DoctorBase):
id: Optional[int] = None
user_id: int
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class Doctor(DoctorInDBBase):
pass
# Additional properties stored in DB
class DoctorInDB(DoctorInDBBase):
pass

View File

@ -0,0 +1,56 @@
from datetime import datetime, time
from typing import Optional
from enum import Enum
from pydantic import BaseModel
class WeekDay(str, Enum):
MONDAY = "monday"
TUESDAY = "tuesday"
WEDNESDAY = "wednesday"
THURSDAY = "thursday"
FRIDAY = "friday"
SATURDAY = "saturday"
SUNDAY = "sunday"
# Shared properties
class DoctorScheduleBase(BaseModel):
day_of_week: Optional[WeekDay] = None
start_time: Optional[time] = None
end_time: Optional[time] = None
is_available: Optional[bool] = True
# Properties to receive via API on creation
class DoctorScheduleCreate(DoctorScheduleBase):
doctor_id: int
day_of_week: WeekDay
start_time: time
end_time: time
# Properties to receive via API on update
class DoctorScheduleUpdate(DoctorScheduleBase):
pass
class DoctorScheduleInDBBase(DoctorScheduleBase):
id: Optional[int] = None
doctor_id: int
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class DoctorSchedule(DoctorScheduleInDBBase):
pass
# Additional properties stored in DB
class DoctorScheduleInDB(DoctorScheduleInDBBase):
pass

View File

@ -0,0 +1,45 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
# Shared properties
class MedicalRecordBase(BaseModel):
diagnosis: Optional[str] = None
prescription: Optional[str] = None
notes: Optional[str] = None
visit_date: Optional[datetime] = None
# Properties to receive via API on creation
class MedicalRecordCreate(MedicalRecordBase):
patient_id: int
doctor_id: int
visit_date: datetime
# Properties to receive via API on update
class MedicalRecordUpdate(MedicalRecordBase):
pass
class MedicalRecordInDBBase(MedicalRecordBase):
id: Optional[int] = None
patient_id: int
doctor_id: int
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class MedicalRecord(MedicalRecordInDBBase):
pass
# Additional properties stored in DB
class MedicalRecordInDB(MedicalRecordInDBBase):
pass

44
app/schemas/patient.py Normal file
View File

@ -0,0 +1,44 @@
from datetime import date, datetime
from typing import Optional
from pydantic import BaseModel
# Shared properties
class PatientBase(BaseModel):
date_of_birth: Optional[date] = None
blood_type: Optional[str] = None
allergies: Optional[str] = None
medical_history: Optional[str] = None
emergency_contact_name: Optional[str] = None
emergency_contact_phone: Optional[str] = None
# Properties to receive via API on creation
class PatientCreate(PatientBase):
user_id: int
# Properties to receive via API on update
class PatientUpdate(PatientBase):
pass
class PatientInDBBase(PatientBase):
id: Optional[int] = None
user_id: int
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class Patient(PatientInDBBase):
pass
# Additional properties stored in DB
class PatientInDB(PatientInDBBase):
pass

12
app/schemas/token.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

42
app/schemas/user.py Normal file
View File

@ -0,0 +1,42 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: Optional[int] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

50
main.py Normal file
View File

@ -0,0 +1,50 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.api_v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description=settings.PROJECT_DESCRIPTION,
version=settings.VERSION,
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
)
# Set up CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/")
async def root():
"""
Root endpoint that returns basic service information.
"""
return {
"title": settings.PROJECT_NAME,
"description": settings.PROJECT_DESCRIPTION,
"version": settings.VERSION,
"docs": "/docs",
"health": "/health",
}
@app.get("/health", status_code=200)
async def health_check():
"""
Health check endpoint.
"""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

1
migrations/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration for Alembic migrations.

84
migrations/env.py Normal file
View File

@ -0,0 +1,84 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.db.base import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == "sqlite"
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite, # Key configuration for SQLite
# Ensuring we're comparing type metadata, which is important for alembic autogenerate
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

26
migrations/script.py.mako Normal file
View File

@ -0,0 +1,26 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,117 @@
"""Initial migration
Revision ID: 001
Revises:
Create Date: 2023-06-09
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '001'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# users table
op.create_table(
'users',
sa.Column('id', sa.Integer(), primary_key=True, index=True),
sa.Column('email', sa.String(), unique=True, index=True, nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), default=True),
sa.Column('is_superuser', sa.Boolean(), default=False),
sa.Column('created_at', sa.DateTime(), default=sa.func.current_timestamp()),
sa.Column('updated_at', sa.DateTime(), default=sa.func.current_timestamp(), onupdate=sa.func.current_timestamp()),
)
# doctors table
op.create_table(
'doctors',
sa.Column('id', sa.Integer(), primary_key=True, index=True),
sa.Column('user_id', sa.Integer(), sa.ForeignKey('users.id'), unique=True, nullable=False),
sa.Column('specialty', sa.String(), nullable=False),
sa.Column('license_number', sa.String(), unique=True, nullable=False),
sa.Column('experience_years', sa.Integer(), nullable=True),
sa.Column('bio', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), default=sa.func.current_timestamp()),
sa.Column('updated_at', sa.DateTime(), default=sa.func.current_timestamp(), onupdate=sa.func.current_timestamp()),
)
# patients table
op.create_table(
'patients',
sa.Column('id', sa.Integer(), primary_key=True, index=True),
sa.Column('user_id', sa.Integer(), sa.ForeignKey('users.id'), unique=True, nullable=False),
sa.Column('date_of_birth', sa.Date(), nullable=True),
sa.Column('blood_type', sa.String(), nullable=True),
sa.Column('allergies', sa.Text(), nullable=True),
sa.Column('medical_history', sa.Text(), nullable=True),
sa.Column('emergency_contact_name', sa.String(), nullable=True),
sa.Column('emergency_contact_phone', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(), default=sa.func.current_timestamp()),
sa.Column('updated_at', sa.DateTime(), default=sa.func.current_timestamp(), onupdate=sa.func.current_timestamp()),
)
# doctor_schedules table
op.create_table(
'doctor_schedules',
sa.Column('id', sa.Integer(), primary_key=True, index=True),
sa.Column('doctor_id', sa.Integer(), sa.ForeignKey('doctors.id'), nullable=False),
sa.Column('day_of_week', sa.Enum('monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday', name='weekday'), nullable=False),
sa.Column('start_time', sa.Time(), nullable=False),
sa.Column('end_time', sa.Time(), nullable=False),
sa.Column('is_available', sa.Boolean(), default=True, nullable=False),
sa.Column('created_at', sa.DateTime(), default=sa.func.current_timestamp()),
sa.Column('updated_at', sa.DateTime(), default=sa.func.current_timestamp(), onupdate=sa.func.current_timestamp()),
)
# appointments table
op.create_table(
'appointments',
sa.Column('id', sa.Integer(), primary_key=True, index=True),
sa.Column('patient_id', sa.Integer(), sa.ForeignKey('patients.id'), nullable=False),
sa.Column('doctor_id', sa.Integer(), sa.ForeignKey('doctors.id'), nullable=False),
sa.Column('appointment_datetime', sa.DateTime(), nullable=False),
sa.Column('duration_minutes', sa.Integer(), default=30, nullable=False),
sa.Column('status', sa.Enum('scheduled', 'confirmed', 'completed', 'cancelled', 'no_show', name='appointmentstatus'), default='scheduled', nullable=False),
sa.Column('reason', sa.Text(), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), default=sa.func.current_timestamp()),
sa.Column('updated_at', sa.DateTime(), default=sa.func.current_timestamp(), onupdate=sa.func.current_timestamp()),
)
# medical_records table
op.create_table(
'medical_records',
sa.Column('id', sa.Integer(), primary_key=True, index=True),
sa.Column('patient_id', sa.Integer(), sa.ForeignKey('patients.id'), nullable=False),
sa.Column('doctor_id', sa.Integer(), sa.ForeignKey('doctors.id'), nullable=False),
sa.Column('diagnosis', sa.Text(), nullable=True),
sa.Column('prescription', sa.Text(), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('visit_date', sa.DateTime(), default=sa.func.current_timestamp(), nullable=False),
sa.Column('created_at', sa.DateTime(), default=sa.func.current_timestamp()),
sa.Column('updated_at', sa.DateTime(), default=sa.func.current_timestamp(), onupdate=sa.func.current_timestamp()),
)
def downgrade() -> None:
# Drop tables in reverse order of creation
op.drop_table('medical_records')
op.drop_table('appointments')
op.drop_table('doctor_schedules')
op.drop_table('patients')
op.drop_table('doctors')
op.drop_table('users')
# Drop enums
op.execute('DROP TYPE IF EXISTS appointmentstatus')
op.execute('DROP TYPE IF EXISTS weekday')

13
requirements.txt Normal file
View File

@ -0,0 +1,13 @@
fastapi>=0.95.0,<0.96.0
uvicorn>=0.21.1,<0.22.0
sqlalchemy>=2.0.7,<2.1.0
pydantic>=2.0.0,<3.0.0
pydantic-settings>=2.0.0,<3.0.0
alembic>=1.10.2,<1.11.0
python-jose[cryptography]>=3.3.0,<3.4.0
passlib[bcrypt]>=1.7.4,<1.8.0
python-multipart>=0.0.6,<0.1.0
email-validator>=2.0.0,<3.0.0
ruff>=0.0.262,<0.1.0
pytest>=7.3.1,<7.4.0
httpx>=0.24.0,<0.25.0