
- FastAPI application with JWT authentication and role-based access control - Complete employee management with CRUD operations - Department management with manager assignments - Leave management system with approval workflow - Payroll processing with overtime and deductions calculation - Attendance tracking with clock in/out functionality - SQLite database with proper migrations using Alembic - Role-based permissions (Admin, HR Manager, Manager, Employee) - Comprehensive API documentation and health checks - CORS enabled for cross-origin requests Environment Variables Required: - SECRET_KEY: JWT secret key for token signing Features implemented: - User registration and authentication - Employee profile management - Department hierarchy management - Leave request creation and approval - Payroll record processing - Daily attendance tracking - Hours calculation for attendance - Proper error handling and validation
168 lines
6.3 KiB
Python
168 lines
6.3 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from app.db.session import get_db
|
|
from app.schemas.payroll import PayrollRecord, PayrollRecordCreate, PayrollRecordUpdate
|
|
from app.models.payroll import PayrollRecord as PayrollRecordModel, PayrollStatus
|
|
from app.models.users import User, UserRole
|
|
from app.models.employees import Employee
|
|
from app.core.deps import get_current_user, require_role
|
|
|
|
router = APIRouter()
|
|
|
|
def calculate_payroll(payroll_data: dict) -> dict:
|
|
"""Calculate gross pay and net pay"""
|
|
base_salary = payroll_data.get('base_salary', Decimal('0'))
|
|
overtime_hours = payroll_data.get('overtime_hours', Decimal('0'))
|
|
overtime_rate = payroll_data.get('overtime_rate', Decimal('0'))
|
|
bonus = payroll_data.get('bonus', Decimal('0'))
|
|
deductions = payroll_data.get('deductions', Decimal('0'))
|
|
tax_deductions = payroll_data.get('tax_deductions', Decimal('0'))
|
|
|
|
overtime_pay = overtime_hours * overtime_rate
|
|
gross_pay = base_salary + overtime_pay + bonus
|
|
net_pay = gross_pay - deductions - tax_deductions
|
|
|
|
return {
|
|
'gross_pay': gross_pay,
|
|
'net_pay': net_pay
|
|
}
|
|
|
|
@router.post("", response_model=PayrollRecord)
|
|
def create_payroll_record(
|
|
payroll: PayrollRecordCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role([UserRole.ADMIN, UserRole.HR_MANAGER]))
|
|
):
|
|
payroll_data = payroll.dict()
|
|
calculated_values = calculate_payroll(payroll_data)
|
|
|
|
db_payroll = PayrollRecordModel(
|
|
**payroll_data,
|
|
**calculated_values
|
|
)
|
|
db.add(db_payroll)
|
|
db.commit()
|
|
db.refresh(db_payroll)
|
|
return db_payroll
|
|
|
|
@router.get("", response_model=List[PayrollRecord])
|
|
def read_payroll_records(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
if current_user.role in [UserRole.ADMIN, UserRole.HR_MANAGER]:
|
|
# HR and Admin can see all payroll records
|
|
payroll_records = db.query(PayrollRecordModel).offset(skip).limit(limit).all()
|
|
else:
|
|
# Employees can only see their own payroll records
|
|
current_employee = db.query(Employee).filter(Employee.user_id == current_user.id).first()
|
|
if not current_employee:
|
|
raise HTTPException(status_code=404, detail="Employee profile not found")
|
|
payroll_records = db.query(PayrollRecordModel).filter(
|
|
PayrollRecordModel.employee_id == current_employee.id
|
|
).offset(skip).limit(limit).all()
|
|
|
|
return payroll_records
|
|
|
|
@router.get("/{payroll_id}", response_model=PayrollRecord)
|
|
def read_payroll_record(
|
|
payroll_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
payroll_record = db.query(PayrollRecordModel).filter(PayrollRecordModel.id == payroll_id).first()
|
|
if payroll_record is None:
|
|
raise HTTPException(status_code=404, detail="Payroll record not found")
|
|
|
|
# Check permissions
|
|
if current_user.role not in [UserRole.ADMIN, UserRole.HR_MANAGER]:
|
|
current_employee = db.query(Employee).filter(Employee.user_id == current_user.id).first()
|
|
if not current_employee or payroll_record.employee_id != current_employee.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions"
|
|
)
|
|
|
|
return payroll_record
|
|
|
|
@router.put("/{payroll_id}", response_model=PayrollRecord)
|
|
def update_payroll_record(
|
|
payroll_id: int,
|
|
payroll_update: PayrollRecordUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role([UserRole.ADMIN, UserRole.HR_MANAGER]))
|
|
):
|
|
payroll_record = db.query(PayrollRecordModel).filter(PayrollRecordModel.id == payroll_id).first()
|
|
if payroll_record is None:
|
|
raise HTTPException(status_code=404, detail="Payroll record not found")
|
|
|
|
# Can only update draft records
|
|
if payroll_record.status != PayrollStatus.DRAFT:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Can only update draft payroll records"
|
|
)
|
|
|
|
update_data = payroll_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(payroll_record, field, value)
|
|
|
|
# Recalculate totals
|
|
payroll_data = {
|
|
'base_salary': payroll_record.base_salary,
|
|
'overtime_hours': payroll_record.overtime_hours,
|
|
'overtime_rate': payroll_record.overtime_rate,
|
|
'bonus': payroll_record.bonus,
|
|
'deductions': payroll_record.deductions,
|
|
'tax_deductions': payroll_record.tax_deductions
|
|
}
|
|
calculated_values = calculate_payroll(payroll_data)
|
|
payroll_record.gross_pay = calculated_values['gross_pay']
|
|
payroll_record.net_pay = calculated_values['net_pay']
|
|
|
|
db.commit()
|
|
db.refresh(payroll_record)
|
|
return payroll_record
|
|
|
|
@router.post("/{payroll_id}/process")
|
|
def process_payroll_record(
|
|
payroll_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role([UserRole.ADMIN, UserRole.HR_MANAGER]))
|
|
):
|
|
payroll_record = db.query(PayrollRecordModel).filter(PayrollRecordModel.id == payroll_id).first()
|
|
if payroll_record is None:
|
|
raise HTTPException(status_code=404, detail="Payroll record not found")
|
|
|
|
if payroll_record.status != PayrollStatus.DRAFT:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Can only process draft payroll records"
|
|
)
|
|
|
|
payroll_record.status = PayrollStatus.PROCESSED
|
|
payroll_record.processed_by = current_user.id
|
|
payroll_record.processed_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
db.refresh(payroll_record)
|
|
return {"message": "Payroll record processed successfully"}
|
|
|
|
@router.delete("/{payroll_id}")
|
|
def delete_payroll_record(
|
|
payroll_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role([UserRole.ADMIN]))
|
|
):
|
|
payroll_record = db.query(PayrollRecordModel).filter(PayrollRecordModel.id == payroll_id).first()
|
|
if payroll_record is None:
|
|
raise HTTPException(status_code=404, detail="Payroll record not found")
|
|
|
|
db.delete(payroll_record)
|
|
db.commit()
|
|
return {"message": "Payroll record deleted successfully"} |