Implement HR platform backend with FastAPI and SQLite

- Set up project structure with FastAPI framework
- Create database models for users, employees, departments, and job titles
- Implement JWT authentication and authorization system
- Set up SQLite database with SQLAlchemy ORM
- Add Alembic migrations for database versioning
- Create CRUD API endpoints for employee management
- Implement category-based search functionality
- Add OpenAPI documentation and health check endpoint
- Update README with comprehensive setup and usage instructions
This commit is contained in:
Automated Action 2025-06-03 01:18:41 +00:00
parent 63098c86db
commit 27c9268a6a
43 changed files with 1967 additions and 2 deletions

169
README.md
View File

@ -1,3 +1,168 @@
# FastAPI Application
# HR Platform Backend
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A comprehensive HR platform backend built with FastAPI and SQLite, providing employee management, authentication, and category-based search functionality.
## Features
- **Authentication & Authorization**: JWT-based authentication system with user roles
- **Employee Management**: Complete CRUD operations for employees
- **Department Management**: Organize employees by departments
- **Job Title Management**: Manage job positions and titles
- **Category-Based Search**: Advanced search functionality by skills, categories, departments, and more
- **OpenAPI Documentation**: Interactive API documentation with Swagger UI and ReDoc
- **Health Check Endpoint**: Monitor the health of the API and database
## Tech Stack
- **Framework**: FastAPI
- **Database**: SQLite
- **ORM**: SQLAlchemy
- **Migrations**: Alembic
- **Authentication**: JWT (JSON Web Tokens)
- **API Documentation**: OpenAPI (Swagger and ReDoc)
## Project Structure
```
.
├── alembic.ini # Alembic configuration
├── app # Main application package
│ ├── api # API endpoints
│ │ ├── deps.py # API dependencies
│ │ ├── routes.py # API router
│ │ └── v1 # API version 1 endpoints
│ ├── core # Core application modules
│ │ ├── config.py # Application configuration
│ │ └── security.py # Security utilities
│ ├── crud # CRUD operations
│ ├── db # Database setup
│ │ ├── base.py # Base models import
│ │ ├── base_class.py # Base model class
│ │ ├── init_db.py # Database initialization
│ │ └── session.py # Database session
│ ├── models # SQLAlchemy models
│ └── schemas # Pydantic schemas
├── main.py # Application entry point
├── migrations # Alembic migrations
│ ├── env.py # Migration environment
│ ├── script.py.mako # Migration script template
│ └── versions # Migration versions
├── openapi.json # Generated OpenAPI schema
├── openapi_schema.py # Script to generate OpenAPI schema
└── requirements.txt # Python dependencies
```
## Getting Started
### Prerequisites
- Python 3.8 or higher
- pip (Python package installer)
### Installation
1. Clone the repository:
```bash
git clone <repository-url>
cd hrplatformbackend
```
2. Install the dependencies:
```bash
pip install -r requirements.txt
```
3. Set environment variables (optional, defaults are provided):
```bash
export SECRET_KEY="your-secret-key"
export FIRST_SUPERUSER="admin@example.com"
export FIRST_SUPERUSER_PASSWORD="admin123"
```
### Running Migrations
Run Alembic migrations to set up the database schema:
```bash
alembic upgrade head
```
### Running the Application
Start the FastAPI server:
```bash
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
```
The API will be available at [http://localhost:8000](http://localhost:8000)
## API Documentation
Once the server is running, you can access the interactive API documentation:
- Swagger UI: [http://localhost:8000/docs](http://localhost:8000/docs)
- ReDoc: [http://localhost:8000/redoc](http://localhost:8000/redoc)
- OpenAPI JSON: [http://localhost:8000/openapi.json](http://localhost:8000/openapi.json)
## Usage
### Authentication
1. Register a new user:
```bash
curl -X POST "http://localhost:8000/api/v1/auth/register" \
-H "Content-Type: application/json" \
-d '{"email": "user@example.com", "password": "password123", "full_name": "Example User"}'
```
2. Login to get access token:
```bash
curl -X POST "http://localhost:8000/api/v1/auth/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=user@example.com&password=password123"
```
3. Use the token for authenticated requests:
```bash
curl -X GET "http://localhost:8000/api/v1/users/me" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
```
### Employee Management
1. Create a new employee:
```bash
curl -X POST "http://localhost:8000/api/v1/employees/" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"first_name": "John",
"last_name": "Doe",
"email": "john.doe@example.com",
"hire_date": "2023-01-15",
"department_id": "department_id_here",
"job_title_id": "job_title_id_here"
}'
```
2. Search employees by category:
```bash
curl -X GET "http://localhost:8000/api/v1/employees/search/?categories=IT&skills=Python&department_id=dept_id" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
```
## Environment Variables
| Variable | Description | Default Value |
|---------------------------|-------------------------------------------|-------------------------------------------|
| PROJECT_NAME | Name of the project | HR Platform API |
| SECRET_KEY | Secret key for JWT encoding | Auto-generated |
| ACCESS_TOKEN_EXPIRE_MINUTES| JWT token expiration time in minutes | 11520 (8 days) |
| FIRST_SUPERUSER | Email for the initial admin user | admin@example.com |
| FIRST_SUPERUSER_PASSWORD | Password for the initial admin user | admin123 |
| SQLALCHEMY_DATABASE_URI | Database connection URI | sqlite:////app/storage/db/db.sqlite |
## License
This project is licensed under the MIT License - see the LICENSE file for details.

85
alembic.ini Normal file
View File

@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL example
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

52
app/api/deps.py Normal file
View File

@ -0,0 +1,52 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core import security
from app.core.config import settings
from app.db.session import get_db
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(reusable_oauth2)
) -> models.User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_active(current_user):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

10
app/api/routes.py Normal file
View File

@ -0,0 +1,10 @@
from fastapi import APIRouter
from app.api.v1 import auth, users, employees, departments, job_titles
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["authentication"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(employees.router, prefix="/employees", tags=["employees"])
api_router.include_router(departments.router, prefix="/departments", tags=["departments"])
api_router.include_router(job_titles.router, prefix="/job-titles", tags=["job_titles"])

0
app/api/v1/__init__.py Normal file
View File

63
app/api/v1/auth.py Normal file
View File

@ -0,0 +1,63 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
router = APIRouter()
@router.post("/login", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=schemas.User)
def register_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
) -> Any:
"""
Register a new user
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system",
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.post("/test-token", response_model=schemas.User)
def test_token(current_user: models.User = Depends(deps.get_current_user)) -> Any:
"""
Test access token
"""
return current_user

113
app/api/v1/departments.py Normal file
View File

@ -0,0 +1,113 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Department])
def read_departments(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve departments.
"""
departments = crud.department.get_multi(db, skip=skip, limit=limit)
return departments
@router.post("/", response_model=schemas.Department)
def create_department(
*,
db: Session = Depends(deps.get_db),
department_in: schemas.DepartmentCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new department.
"""
department = crud.department.get_by_name(db, name=department_in.name)
if department:
raise HTTPException(
status_code=400,
detail="The department with this name already exists in the system",
)
department = crud.department.create(db, obj_in=department_in)
return department
@router.get("/{id}", response_model=schemas.Department)
def read_department(
*,
db: Session = Depends(deps.get_db),
id: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get department by ID.
"""
department = crud.department.get(db, id=id)
if not department:
raise HTTPException(status_code=404, detail="Department not found")
return department
@router.put("/{id}", response_model=schemas.Department)
def update_department(
*,
db: Session = Depends(deps.get_db),
id: str,
department_in: schemas.DepartmentUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a department.
"""
department = crud.department.get(db, id=id)
if not department:
raise HTTPException(status_code=404, detail="Department not found")
# Check if name is changed and if the new name already exists
if department_in.name and department_in.name != department.name:
existing_department = crud.department.get_by_name(db, name=department_in.name)
if existing_department:
raise HTTPException(
status_code=400,
detail="The department with this name already exists in the system",
)
department = crud.department.update(db, db_obj=department, obj_in=department_in)
return department
@router.delete("/{id}", status_code=204, response_model=None)
def delete_department(
*,
db: Session = Depends(deps.get_db),
id: str,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a department.
"""
department = crud.department.get(db, id=id)
if not department:
raise HTTPException(status_code=404, detail="Department not found")
# Check if there are employees in this department
employees = db.query(models.Employee).filter(models.Employee.department_id == id).first()
if employees:
raise HTTPException(
status_code=400,
detail="Cannot delete department with employees. Reassign employees first.",
)
crud.department.remove(db, id=id)
return None

275
app/api/v1/employees.py Normal file
View File

@ -0,0 +1,275 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Employee])
def read_employees(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve employees.
"""
employees = crud.employee.get_all(db, skip=skip, limit=limit)
return employees
@router.post("/", response_model=schemas.Employee)
def create_employee(
*,
db: Session = Depends(deps.get_db),
employee_in: schemas.EmployeeCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new employee.
"""
# Check if email already exists
employee = crud.employee.get_by_email(db, email=employee_in.email)
if employee:
raise HTTPException(
status_code=400,
detail="The employee with this email already exists in the system",
)
# Check if department exists
department = crud.department.get(db, id=employee_in.department_id)
if not department:
raise HTTPException(
status_code=404,
detail=f"Department with id {employee_in.department_id} not found",
)
# Check if job title exists
job_title = crud.job_title.get(db, id=employee_in.job_title_id)
if not job_title:
raise HTTPException(
status_code=404,
detail=f"Job title with id {employee_in.job_title_id} not found",
)
# Check if manager exists (if provided)
if employee_in.manager_id:
manager = crud.employee.get(db, id=employee_in.manager_id)
if not manager:
raise HTTPException(
status_code=404,
detail=f"Manager with id {employee_in.manager_id} not found",
)
employee = crud.employee.create(db, obj_in=employee_in)
return employee
@router.get("/{employee_id}", response_model=schemas.EmployeeWithDetails)
def read_employee(
*,
db: Session = Depends(deps.get_db),
employee_id: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get employee by ID.
"""
employee = crud.employee.get(db, id=employee_id)
if not employee:
raise HTTPException(status_code=404, detail="Employee not found")
return employee
@router.put("/{employee_id}", response_model=schemas.Employee)
def update_employee(
*,
db: Session = Depends(deps.get_db),
employee_id: str,
employee_in: schemas.EmployeeUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update an employee.
"""
employee = crud.employee.get(db, id=employee_id)
if not employee:
raise HTTPException(status_code=404, detail="Employee not found")
# If email is being updated, check if it already exists
if employee_in.email and employee_in.email != employee.email:
existing_employee = crud.employee.get_by_email(db, email=employee_in.email)
if existing_employee:
raise HTTPException(
status_code=400,
detail="The employee with this email already exists in the system",
)
# Check if department exists (if provided)
if employee_in.department_id:
department = crud.department.get(db, id=employee_in.department_id)
if not department:
raise HTTPException(
status_code=404,
detail=f"Department with id {employee_in.department_id} not found",
)
# Check if job title exists (if provided)
if employee_in.job_title_id:
job_title = crud.job_title.get(db, id=employee_in.job_title_id)
if not job_title:
raise HTTPException(
status_code=404,
detail=f"Job title with id {employee_in.job_title_id} not found",
)
# Check if manager exists (if provided)
if employee_in.manager_id:
# Prevent self-assignment as manager
if employee_in.manager_id == employee_id:
raise HTTPException(
status_code=400,
detail="An employee cannot be their own manager",
)
manager = crud.employee.get(db, id=employee_in.manager_id)
if not manager:
raise HTTPException(
status_code=404,
detail=f"Manager with id {employee_in.manager_id} not found",
)
employee = crud.employee.update(db, db_obj=employee, obj_in=employee_in)
return employee
@router.delete("/{employee_id}", status_code=204, response_model=None)
def delete_employee(
*,
db: Session = Depends(deps.get_db),
employee_id: str,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete an employee.
"""
employee = crud.employee.get(db, id=employee_id)
if not employee:
raise HTTPException(status_code=404, detail="Employee not found")
# Check if this employee is a manager for other employees
subordinates = db.query(models.Employee).filter(models.Employee.manager_id == employee_id).first()
if subordinates:
raise HTTPException(
status_code=400,
detail="Cannot delete employee who is a manager. Reassign subordinates first.",
)
crud.employee.remove(db, id=employee_id)
return None
@router.get("/department/{department_id}", response_model=List[schemas.Employee])
def read_employees_by_department(
*,
db: Session = Depends(deps.get_db),
department_id: str,
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve employees by department.
"""
# Check if department exists
department = crud.department.get(db, id=department_id)
if not department:
raise HTTPException(status_code=404, detail="Department not found")
employees = crud.employee.get_by_department(
db, department_id=department_id, skip=skip, limit=limit
)
return employees
@router.get("/job-title/{job_title_id}", response_model=List[schemas.Employee])
def read_employees_by_job_title(
*,
db: Session = Depends(deps.get_db),
job_title_id: str,
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve employees by job title.
"""
# Check if job title exists
job_title = crud.job_title.get(db, id=job_title_id)
if not job_title:
raise HTTPException(status_code=404, detail="Job title not found")
employees = crud.employee.get_by_job_title(
db, job_title_id=job_title_id, skip=skip, limit=limit
)
return employees
@router.get("/manager/{manager_id}", response_model=List[schemas.Employee])
def read_employees_by_manager(
*,
db: Session = Depends(deps.get_db),
manager_id: str,
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve employees by manager.
"""
# Check if manager exists
manager = crud.employee.get(db, id=manager_id)
if not manager:
raise HTTPException(status_code=404, detail="Manager not found")
employees = crud.employee.get_by_manager(
db, manager_id=manager_id, skip=skip, limit=limit
)
return employees
@router.get("/search/", response_model=List[schemas.Employee])
def search_employees(
*,
db: Session = Depends(deps.get_db),
department_id: Optional[str] = None,
job_title_id: Optional[str] = None,
name: Optional[str] = None,
skills: Optional[List[str]] = Query(None),
categories: Optional[List[str]] = Query(None),
is_active: Optional[bool] = True,
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Search employees by various criteria including categories and skills.
"""
search_params = schemas.EmployeeSearchParams(
department_id=department_id,
job_title_id=job_title_id,
name=name,
skills=skills,
categories=categories,
is_active=is_active,
)
employees = crud.employee.search(
db, params=search_params, skip=skip, limit=limit
)
return employees

113
app/api/v1/job_titles.py Normal file
View File

@ -0,0 +1,113 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.JobTitle])
def read_job_titles(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve job titles.
"""
job_titles = crud.job_title.get_multi(db, skip=skip, limit=limit)
return job_titles
@router.post("/", response_model=schemas.JobTitle)
def create_job_title(
*,
db: Session = Depends(deps.get_db),
job_title_in: schemas.JobTitleCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new job title.
"""
job_title = crud.job_title.get_by_title(db, title=job_title_in.title)
if job_title:
raise HTTPException(
status_code=400,
detail="The job title with this name already exists in the system",
)
job_title = crud.job_title.create(db, obj_in=job_title_in)
return job_title
@router.get("/{id}", response_model=schemas.JobTitle)
def read_job_title(
*,
db: Session = Depends(deps.get_db),
id: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get job title by ID.
"""
job_title = crud.job_title.get(db, id=id)
if not job_title:
raise HTTPException(status_code=404, detail="Job title not found")
return job_title
@router.put("/{id}", response_model=schemas.JobTitle)
def update_job_title(
*,
db: Session = Depends(deps.get_db),
id: str,
job_title_in: schemas.JobTitleUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a job title.
"""
job_title = crud.job_title.get(db, id=id)
if not job_title:
raise HTTPException(status_code=404, detail="Job title not found")
# Check if title is changed and if the new title already exists
if job_title_in.title and job_title_in.title != job_title.title:
existing_job_title = crud.job_title.get_by_title(db, title=job_title_in.title)
if existing_job_title:
raise HTTPException(
status_code=400,
detail="The job title with this name already exists in the system",
)
job_title = crud.job_title.update(db, db_obj=job_title, obj_in=job_title_in)
return job_title
@router.delete("/{id}", status_code=204, response_model=None)
def delete_job_title(
*,
db: Session = Depends(deps.get_db),
id: str,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Delete a job title.
"""
job_title = crud.job_title.get(db, id=id)
if not job_title:
raise HTTPException(status_code=404, detail="Job title not found")
# Check if there are employees with this job title
employees = db.query(models.Employee).filter(models.Employee.job_title_id == id).first()
if employees:
raise HTTPException(
status_code=400,
detail="Cannot delete job title assigned to employees. Reassign employees first.",
)
crud.job_title.remove(db, id=id)
return None

99
app/api/v1/users.py Normal file
View File

@ -0,0 +1,99 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.get("/me", response_model=schemas.User)
def read_user_me(
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: str,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: str,
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user

0
app/core/__init__.py Normal file
View File

40
app/core/config.py Normal file
View File

@ -0,0 +1,40 @@
from typing import List, Optional
from pydantic_settings import BaseSettings
from pydantic import validator, EmailStr
import secrets
class Settings(BaseSettings):
PROJECT_NAME: str = "HR Platform API"
API_V1_STR: str = "/api/v1"
# SECURITY
SECRET_KEY: str = secrets.token_urlsafe(32)
# 60 minutes * 24 hours * 8 days = 8 days
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8
# CORS
BACKEND_CORS_ORIGINS: List[str] = ["*"]
# First superuser
FIRST_SUPERUSER: EmailStr = "admin@example.com"
FIRST_SUPERUSER_PASSWORD: str = "admin123"
# SQLITE DB
SQLALCHEMY_DATABASE_URI: Optional[str] = None
@validator("SQLALCHEMY_DATABASE_URI", pre=True)
def assemble_db_connection(cls, v: Optional[str], values: dict) -> str:
if v:
return v
from pathlib import Path
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
return f"sqlite:///{DB_DIR}/db.sqlite"
class Config:
case_sensitive = True
env_file = ".env"
settings = Settings()

30
app/core/security.py Normal file
View File

@ -0,0 +1,30 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

6
app/crud/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from app.crud.user import user
from app.crud.department import department
from app.crud.job_title import job_title
from app.crud.employee import employee
__all__ = ["user", "department", "job_title", "employee"]

60
app/crud/base.py Normal file
View File

@ -0,0 +1,60 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base_class import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: Any) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

18
app/crud/department.py Normal file
View File

@ -0,0 +1,18 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.department import Department
from app.schemas.department import DepartmentCreate, DepartmentUpdate
class CRUDDepartment(CRUDBase[Department, DepartmentCreate, DepartmentUpdate]):
def get_by_name(self, db: Session, *, name: str) -> Optional[Department]:
return db.query(Department).filter(Department.name == name).first()
def get_all(self, db: Session) -> List[Department]:
return db.query(Department).all()
department = CRUDDepartment(Department)

64
app/crud/employee.py Normal file
View File

@ -0,0 +1,64 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import or_
from app.crud.base import CRUDBase
from app.models.employee import Employee
from app.schemas.employee import EmployeeCreate, EmployeeUpdate, EmployeeSearchParams
class CRUDEmployee(CRUDBase[Employee, EmployeeCreate, EmployeeUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[Employee]:
return db.query(Employee).filter(Employee.email == email).first()
def get_all(self, db: Session, *, skip: int = 0, limit: int = 100) -> List[Employee]:
return db.query(Employee).offset(skip).limit(limit).all()
def get_by_department(self, db: Session, *, department_id: str, skip: int = 0, limit: int = 100) -> List[Employee]:
return db.query(Employee).filter(Employee.department_id == department_id).offset(skip).limit(limit).all()
def get_by_job_title(self, db: Session, *, job_title_id: str, skip: int = 0, limit: int = 100) -> List[Employee]:
return db.query(Employee).filter(Employee.job_title_id == job_title_id).offset(skip).limit(limit).all()
def get_by_manager(self, db: Session, *, manager_id: str, skip: int = 0, limit: int = 100) -> List[Employee]:
return db.query(Employee).filter(Employee.manager_id == manager_id).offset(skip).limit(limit).all()
def search(self, db: Session, *, params: EmployeeSearchParams, skip: int = 0, limit: int = 100) -> List[Employee]:
query = db.query(Employee)
# Apply filters based on the search parameters
if params.department_id:
query = query.filter(Employee.department_id == params.department_id)
if params.job_title_id:
query = query.filter(Employee.job_title_id == params.job_title_id)
if params.is_active is not None:
query = query.filter(Employee.is_active == params.is_active)
if params.name:
query = query.filter(
or_(
Employee.first_name.ilike(f"%{params.name}%"),
Employee.last_name.ilike(f"%{params.name}%")
)
)
if params.skills:
skill_filters = []
for skill in params.skills:
skill_filters.append(Employee.skills.ilike(f"%{skill}%"))
if skill_filters:
query = query.filter(or_(*skill_filters))
if params.categories:
category_filters = []
for category in params.categories:
category_filters.append(Employee.categories.ilike(f"%{category}%"))
if category_filters:
query = query.filter(or_(*category_filters))
return query.offset(skip).limit(limit).all()
employee = CRUDEmployee(Employee)

18
app/crud/job_title.py Normal file
View File

@ -0,0 +1,18 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.job_title import JobTitle
from app.schemas.job_title import JobTitleCreate, JobTitleUpdate
class CRUDJobTitle(CRUDBase[JobTitle, JobTitleCreate, JobTitleUpdate]):
def get_by_title(self, db: Session, *, title: str) -> Optional[JobTitle]:
return db.query(JobTitle).filter(JobTitle.title == title).first()
def get_all(self, db: Session) -> List[JobTitle]:
return db.query(JobTitle).all()
job_title = CRUDJobTitle(JobTitle)

56
app/crud/user.py Normal file
View File

@ -0,0 +1,56 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
is_active=obj_in.is_active,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

0
app/db/__init__.py Normal file
View File

7
app/db/base.py Normal file
View File

@ -0,0 +1,7 @@
# Import all the models, so that Base has them before being
# imported by Alembic
from app.db.base_class import Base # noqa
from app.models.user import User # noqa
from app.models.employee import Employee # noqa
from app.models.department import Department # noqa
from app.models.job_title import JobTitle # noqa

13
app/db/base_class.py Normal file
View File

@ -0,0 +1,13 @@
from typing import Any
from sqlalchemy.ext.declarative import as_declarative, declared_attr
@as_declarative()
class Base:
id: Any
__name__: str
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()

51
app/db/init_db.py Normal file
View File

@ -0,0 +1,51 @@
from sqlalchemy.orm import Session
from app import crud, schemas
from app.core.config import settings
from app.db import base # noqa: F401
# make sure all SQL Alchemy models are imported (app.db.base) before initializing DB
# otherwise, SQL Alchemy might fail to initialize relationships properly
# for more details: https://github.com/tiangolo/full-stack-fastapi-postgresql/issues/28
def init_db(db: Session) -> None:
# Create first admin user if not exists
user = crud.user.get_by_email(db, email=settings.FIRST_SUPERUSER)
if not user:
user_in = schemas.UserCreate(
email=settings.FIRST_SUPERUSER,
password=settings.FIRST_SUPERUSER_PASSWORD,
is_superuser=True,
full_name="Initial Admin User",
)
user = crud.user.create(db, obj_in=user_in)
# Add some initial departments if none exist
if db.query(base.Department).count() == 0:
departments = [
{"name": "HR", "description": "Human Resources Department"},
{"name": "IT", "description": "Information Technology Department"},
{"name": "Finance", "description": "Finance and Accounting Department"},
{"name": "Marketing", "description": "Marketing and Sales Department"},
{"name": "Operations", "description": "Operations and Production Department"},
]
for dept in departments:
dept_in = schemas.DepartmentCreate(**dept)
crud.department.create(db, obj_in=dept_in)
# Add some initial job titles if none exist
if db.query(base.JobTitle).count() == 0:
job_titles = [
{"title": "HR Manager", "description": "Manages HR operations"},
{"title": "Software Engineer", "description": "Develops software applications"},
{"title": "Accountant", "description": "Handles financial records and accounting"},
{"title": "Marketing Specialist", "description": "Manages marketing campaigns"},
{"title": "Operations Manager", "description": "Manages day-to-day operations"},
]
for title in job_titles:
title_in = schemas.JobTitleCreate(**title)
crud.job_title.create(db, obj_in=title_in)

23
app/db/session.py Normal file
View File

@ -0,0 +1,23 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from pathlib import Path
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

0
app/models/__init__.py Normal file
View File

13
app/models/department.py Normal file
View File

@ -0,0 +1,13 @@
from sqlalchemy import Column, String, DateTime, Text
from sqlalchemy.sql import func
from uuid import uuid4
from app.db.base_class import Base
class Department(Base):
id = Column(String, primary_key=True, index=True, default=lambda: str(uuid4()))
name = Column(String, index=True, nullable=False, unique=True)
description = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

38
app/models/employee.py Normal file
View File

@ -0,0 +1,38 @@
from sqlalchemy import Column, String, DateTime, Text, ForeignKey, Date, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from uuid import uuid4
from app.db.base_class import Base
class Employee(Base):
id = Column(String, primary_key=True, index=True, default=lambda: str(uuid4()))
first_name = Column(String, index=True, nullable=False)
last_name = Column(String, index=True, nullable=False)
email = Column(String, unique=True, index=True, nullable=False)
phone = Column(String, nullable=True)
hire_date = Column(Date, nullable=False)
birth_date = Column(Date, nullable=True)
address = Column(Text, nullable=True)
# Foreign keys
department_id = Column(String, ForeignKey("department.id"), nullable=False)
job_title_id = Column(String, ForeignKey("jobtitle.id"), nullable=False)
manager_id = Column(String, ForeignKey("employee.id"), nullable=True)
# Relationships
department = relationship("Department", foreign_keys=[department_id])
job_title = relationship("JobTitle", foreign_keys=[job_title_id])
manager = relationship("Employee", remote_side=[id], backref="subordinates")
# Skills and categories for searching
skills = Column(Text, nullable=True) # Comma-separated list of skills
categories = Column(Text, nullable=True) # Comma-separated list of categories
# Status
is_active = Column(Boolean(), default=True)
# Audit fields
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

13
app/models/job_title.py Normal file
View File

@ -0,0 +1,13 @@
from sqlalchemy import Column, String, DateTime, Text
from sqlalchemy.sql import func
from uuid import uuid4
from app.db.base_class import Base
class JobTitle(Base):
id = Column(String, primary_key=True, index=True, default=lambda: str(uuid4()))
title = Column(String, index=True, nullable=False, unique=True)
description = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

16
app/models/user.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Boolean, Column, String, DateTime
from sqlalchemy.sql import func
from uuid import uuid4
from app.db.base_class import Base
class User(Base):
id = Column(String, primary_key=True, index=True, default=lambda: str(uuid4()))
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String, index=True)
is_active = Column(Boolean(), default=True)
is_superuser = Column(Boolean(), default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

0
app/schemas/__init__.py Normal file
View File

38
app/schemas/department.py Normal file
View File

@ -0,0 +1,38 @@
from typing import Optional
from pydantic import BaseModel
from datetime import datetime
# Shared properties
class DepartmentBase(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
# Properties to receive via API on creation
class DepartmentCreate(DepartmentBase):
name: str
# Properties to receive via API on update
class DepartmentUpdate(DepartmentBase):
pass
class DepartmentInDBBase(DepartmentBase):
id: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class Department(DepartmentInDBBase):
pass
# Additional properties stored in DB
class DepartmentInDB(DepartmentInDBBase):
pass

74
app/schemas/employee.py Normal file
View File

@ -0,0 +1,74 @@
from typing import Optional, List
from pydantic import BaseModel, EmailStr
from datetime import datetime, date
from app.schemas.department import Department
from app.schemas.job_title import JobTitle
# Shared properties
class EmployeeBase(BaseModel):
first_name: Optional[str] = None
last_name: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
hire_date: Optional[date] = None
birth_date: Optional[date] = None
address: Optional[str] = None
department_id: Optional[str] = None
job_title_id: Optional[str] = None
manager_id: Optional[str] = None
skills: Optional[str] = None
categories: Optional[str] = None
is_active: Optional[bool] = True
# Properties to receive via API on creation
class EmployeeCreate(EmployeeBase):
first_name: str
last_name: str
email: EmailStr
hire_date: date
department_id: str
job_title_id: str
# Properties to receive via API on update
class EmployeeUpdate(EmployeeBase):
pass
class EmployeeInDBBase(EmployeeBase):
id: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class Employee(EmployeeInDBBase):
pass
# Additional properties to return via API with expanded relationships
class EmployeeWithDetails(EmployeeInDBBase):
department: Optional[Department] = None
job_title: Optional[JobTitle] = None
manager: Optional[Employee] = None
# Additional properties stored in DB
class EmployeeInDB(EmployeeInDBBase):
pass
# For searching employees
class EmployeeSearchParams(BaseModel):
department_id: Optional[str] = None
job_title_id: Optional[str] = None
skills: Optional[List[str]] = None
categories: Optional[List[str]] = None
name: Optional[str] = None # For searching by first_name or last_name
is_active: Optional[bool] = None

38
app/schemas/job_title.py Normal file
View File

@ -0,0 +1,38 @@
from typing import Optional
from pydantic import BaseModel
from datetime import datetime
# Shared properties
class JobTitleBase(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
# Properties to receive via API on creation
class JobTitleCreate(JobTitleBase):
title: str
# Properties to receive via API on update
class JobTitleUpdate(JobTitleBase):
pass
class JobTitleInDBBase(JobTitleBase):
id: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class JobTitle(JobTitleInDBBase):
pass
# Additional properties stored in DB
class JobTitleInDB(JobTitleInDBBase):
pass

11
app/schemas/token.py Normal file
View File

@ -0,0 +1,11 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[str] = None

41
app/schemas/user.py Normal file
View File

@ -0,0 +1,41 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
from datetime import datetime
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

80
main.py Normal file
View File

@ -0,0 +1,80 @@
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import datetime
from sqlalchemy.orm import Session
from app.api.routes import api_router
from app.core.config import settings
from app.db import session
app = FastAPI(
title=settings.PROJECT_NAME,
description="""
HR Platform API provides functionality for managing employees, departments, and job titles.
## Authentication
All API endpoints are protected with JWT authentication (except for /auth/login, /auth/register, and /health).
To authenticate, you need to:
1. Login using the `/api/v1/auth/login` endpoint to get an access token
2. Include the token in the Authorization header for all requests: `Authorization: Bearer <token>`
## Features
* **User Management**: Create and manage user accounts with different permission levels
* **Employee Management**: Add, update, delete, and search employees
* **Department Management**: Organize employees by departments
* **Job Title Management**: Manage job positions
* **Category-Based Search**: Search employees by various criteria including skills and categories
""",
version="1.0.0",
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
contact={
"name": "HR Platform Support",
"email": "support@hrplatform.example.com",
},
license_info={
"name": "MIT License",
"url": "https://opensource.org/licenses/MIT",
},
)
# Set up CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API router
app.include_router(api_router, prefix=settings.API_V1_STR)
# Health check endpoint
@app.get("/health", tags=["health"])
async def health_check(db: Session = Depends(session.get_db)):
health_status = {
"status": "ok",
"api": "up",
"timestamp": datetime.datetime.now().isoformat(),
}
# Check database connection
try:
# Execute a simple query to check database connection
db.execute("SELECT 1")
health_status["database"] = "up"
except Exception as e:
health_status["database"] = "down"
health_status["database_error"] = str(e)
health_status["status"] = "degraded"
return health_status
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

83
migrations/env.py Normal file
View File

@ -0,0 +1,83 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from app.db.base import Base # noqa
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
render_as_batch=True, # For SQLite
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == 'sqlite'
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite, # For SQLite
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,111 @@
"""Initial tables
Revision ID: 001
Revises:
Create Date: 2023-10-30 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create user table
op.create_table(
'user',
sa.Column('id', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('is_superuser', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_full_name'), 'user', ['full_name'], unique=False)
op.create_index(op.f('ix_user_id'), 'user', ['id'], unique=False)
# Create department table
op.create_table(
'department',
sa.Column('id', sa.String(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_department_id'), 'department', ['id'], unique=False)
op.create_index(op.f('ix_department_name'), 'department', ['name'], unique=True)
# Create job_title table
op.create_table(
'jobtitle',
sa.Column('id', sa.String(), nullable=False),
sa.Column('title', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_jobtitle_id'), 'jobtitle', ['id'], unique=False)
op.create_index(op.f('ix_jobtitle_title'), 'jobtitle', ['title'], unique=True)
# Create employee table
op.create_table(
'employee',
sa.Column('id', sa.String(), nullable=False),
sa.Column('first_name', sa.String(), nullable=False),
sa.Column('last_name', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('phone', sa.String(), nullable=True),
sa.Column('hire_date', sa.Date(), nullable=False),
sa.Column('birth_date', sa.Date(), nullable=True),
sa.Column('address', sa.Text(), nullable=True),
sa.Column('department_id', sa.String(), nullable=False),
sa.Column('job_title_id', sa.String(), nullable=False),
sa.Column('manager_id', sa.String(), nullable=True),
sa.Column('skills', sa.Text(), nullable=True),
sa.Column('categories', sa.Text(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['department_id'], ['department.id'], ),
sa.ForeignKeyConstraint(['job_title_id'], ['jobtitle.id'], ),
sa.ForeignKeyConstraint(['manager_id'], ['employee.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_employee_email'), 'employee', ['email'], unique=True)
op.create_index(op.f('ix_employee_first_name'), 'employee', ['first_name'], unique=False)
op.create_index(op.f('ix_employee_id'), 'employee', ['id'], unique=False)
op.create_index(op.f('ix_employee_last_name'), 'employee', ['last_name'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_employee_last_name'), table_name='employee')
op.drop_index(op.f('ix_employee_id'), table_name='employee')
op.drop_index(op.f('ix_employee_first_name'), table_name='employee')
op.drop_index(op.f('ix_employee_email'), table_name='employee')
op.drop_table('employee')
op.drop_index(op.f('ix_jobtitle_title'), table_name='jobtitle')
op.drop_index(op.f('ix_jobtitle_id'), table_name='jobtitle')
op.drop_table('jobtitle')
op.drop_index(op.f('ix_department_name'), table_name='department')
op.drop_index(op.f('ix_department_id'), table_name='department')
op.drop_table('department')
op.drop_index(op.f('ix_user_id'), table_name='user')
op.drop_index(op.f('ix_user_full_name'), table_name='user')
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')

11
openapi_schema.py Normal file
View File

@ -0,0 +1,11 @@
import json
from main import app
# Generate the OpenAPI schema
schema = app.openapi()
# Write to file
with open("openapi.json", "w") as f:
json.dump(schema, f, indent=2)
print("OpenAPI schema has been generated at openapi.json")

13
requirements.txt Normal file
View File

@ -0,0 +1,13 @@
fastapi>=0.95.0
uvicorn>=0.21.1
pydantic>=2.0.0
pydantic-settings>=2.0.0
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
python-multipart>=0.0.5
sqlalchemy>=2.0.0
alembic>=1.11.0
tenacity>=8.2.2
email-validator>=2.0.0
ruff>=0.0.272
pytest>=7.3.1