2025-07-01 12:54:48 +00:00

485 lines
14 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import func
from typing import List, Optional
from datetime import datetime, date
from app.db.base import get_db
from app.models.user import User
from app.models.attendance import (
Service,
AttendanceRecord,
AttendanceGoal,
ServiceType,
AttendanceStatus,
)
from app.schemas.attendance import (
ServiceCreate,
ServiceResponse,
AttendanceRecordCreate,
AttendanceRecordUpdate,
AttendanceRecordResponse,
AttendanceGoalCreate,
AttendanceGoalResponse,
AttendanceStats,
)
from app.api.auth import get_current_user
router = APIRouter()
# Services
@router.post("/services", response_model=ServiceResponse)
def create_service(
service: ServiceCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Create a new service (admin only)"""
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
db_service = Service(**service.dict(), created_by=current_user.id)
db.add(db_service)
db.commit()
db.refresh(db_service)
db_service.creator_name = f"{current_user.first_name} {current_user.last_name}"
if db_service.minister:
db_service.minister_name = (
f"{db_service.minister.first_name} {db_service.minister.last_name}"
)
db_service.attendance_count = 0
return db_service
@router.get("/services", response_model=List[ServiceResponse])
def get_services(
skip: int = 0,
limit: int = 100,
service_type: Optional[ServiceType] = None,
start_date: Optional[date] = None,
end_date: Optional[date] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get services"""
query = db.query(Service).filter(Service.is_active)
if service_type:
query = query.filter(Service.service_type == service_type)
if start_date:
query = query.filter(Service.service_date >= start_date)
if end_date:
query = query.filter(Service.service_date <= end_date)
services = (
query.order_by(Service.service_date.desc()).offset(skip).limit(limit).all()
)
for service in services:
service.creator_name = (
f"{service.creator.first_name} {service.creator.last_name}"
)
if service.minister:
service.minister_name = (
f"{service.minister.first_name} {service.minister.last_name}"
)
service.attendance_count = (
db.query(AttendanceRecord)
.filter(
AttendanceRecord.service_id == service.id,
AttendanceRecord.status.in_(
[AttendanceStatus.PRESENT, AttendanceStatus.LATE]
),
)
.count()
)
return services
@router.get("/services/{service_id}", response_model=ServiceResponse)
def get_service(
service_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get specific service"""
service = db.query(Service).filter(Service.id == service_id).first()
if not service:
raise HTTPException(status_code=404, detail="Service not found")
service.creator_name = f"{service.creator.first_name} {service.creator.last_name}"
if service.minister:
service.minister_name = (
f"{service.minister.first_name} {service.minister.last_name}"
)
service.attendance_count = (
db.query(AttendanceRecord)
.filter(
AttendanceRecord.service_id == service.id,
AttendanceRecord.status.in_(
[AttendanceStatus.PRESENT, AttendanceStatus.LATE]
),
)
.count()
)
return service
# Attendance Records
@router.post(
"/services/{service_id}/attendance", response_model=AttendanceRecordResponse
)
def record_attendance(
service_id: int,
attendance: AttendanceRecordCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Record attendance for a service (admin only)"""
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
service = db.query(Service).filter(Service.id == service_id).first()
if not service:
raise HTTPException(status_code=404, detail="Service not found")
# Check if attendance already recorded
existing_record = (
db.query(AttendanceRecord)
.filter(
AttendanceRecord.service_id == service_id,
AttendanceRecord.user_id == attendance.user_id,
)
.first()
)
if existing_record:
raise HTTPException(
status_code=400, detail="Attendance already recorded for this user"
)
db_record = AttendanceRecord(
**attendance.dict(), service_id=service_id, recorded_by=current_user.id
)
db.add(db_record)
db.commit()
db.refresh(db_record)
db_record.user_name = f"{db_record.user.first_name} {db_record.user.last_name}"
db_record.recorder_name = f"{current_user.first_name} {current_user.last_name}"
return db_record
@router.get(
"/services/{service_id}/attendance", response_model=List[AttendanceRecordResponse]
)
def get_service_attendance(
service_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get attendance records for a service"""
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
records = (
db.query(AttendanceRecord)
.filter(AttendanceRecord.service_id == service_id)
.all()
)
for record in records:
record.user_name = f"{record.user.first_name} {record.user.last_name}"
record.recorder_name = (
f"{record.recorder.first_name} {record.recorder.last_name}"
)
return records
@router.put("/attendance/{record_id}", response_model=AttendanceRecordResponse)
def update_attendance_record(
record_id: int,
attendance_update: AttendanceRecordUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update attendance record (admin only)"""
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
record = db.query(AttendanceRecord).filter(AttendanceRecord.id == record_id).first()
if not record:
raise HTTPException(status_code=404, detail="Attendance record not found")
for field, value in attendance_update.dict(exclude_unset=True).items():
setattr(record, field, value)
db.commit()
db.refresh(record)
return record
# Personal Attendance
@router.get("/my-attendance", response_model=List[AttendanceRecordResponse])
def get_my_attendance(
skip: int = 0,
limit: int = 100,
service_type: Optional[ServiceType] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get current user's attendance records"""
query = db.query(AttendanceRecord).filter(
AttendanceRecord.user_id == current_user.id
)
if service_type:
query = query.join(Service).filter(Service.service_type == service_type)
records = (
query.order_by(AttendanceRecord.recorded_at.desc())
.offset(skip)
.limit(limit)
.all()
)
for record in records:
record.user_name = f"{current_user.first_name} {current_user.last_name}"
record.recorder_name = (
f"{record.recorder.first_name} {record.recorder.last_name}"
)
return records
@router.get("/my-stats", response_model=AttendanceStats)
def get_my_attendance_stats(
start_date: Optional[date] = None,
end_date: Optional[date] = None,
service_type: Optional[ServiceType] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get personal attendance statistics"""
# Default to current year if no dates provided
if not start_date:
start_date = date(datetime.now().year, 1, 1)
if not end_date:
end_date = date.today()
# Base queries
services_query = db.query(Service).filter(
Service.service_date >= start_date,
Service.service_date <= end_date,
Service.is_active,
)
attendance_query = (
db.query(AttendanceRecord)
.filter(AttendanceRecord.user_id == current_user.id)
.join(Service)
.filter(Service.service_date >= start_date, Service.service_date <= end_date)
)
if service_type:
services_query = services_query.filter(Service.service_type == service_type)
attendance_query = attendance_query.filter(Service.service_type == service_type)
total_services = services_query.count()
attendance_records = attendance_query.all()
services_attended = len(
[
r
for r in attendance_records
if r.status in [AttendanceStatus.PRESENT, AttendanceStatus.LATE]
]
)
attendance_percentage = (
(services_attended / total_services * 100) if total_services > 0 else 0
)
# Find most attended service type
if not service_type:
service_type_counts = (
db.query(
Service.service_type, func.count(AttendanceRecord.id).label("count")
)
.join(AttendanceRecord)
.filter(
AttendanceRecord.user_id == current_user.id,
AttendanceRecord.status.in_(
[AttendanceStatus.PRESENT, AttendanceStatus.LATE]
),
Service.service_date >= start_date,
Service.service_date <= end_date,
)
.group_by(Service.service_type)
.order_by(func.count(AttendanceRecord.id).desc())
.first()
)
most_attended_service_type = (
service_type_counts.service_type if service_type_counts else None
)
else:
most_attended_service_type = service_type
# Calculate streaks (simplified)
recent_services = (
db.query(Service)
.filter(Service.service_date <= date.today())
.order_by(Service.service_date.desc())
.limit(10)
.all()
)
current_streak = 0
for service in recent_services:
record = (
db.query(AttendanceRecord)
.filter(
AttendanceRecord.service_id == service.id,
AttendanceRecord.user_id == current_user.id,
AttendanceRecord.status.in_(
[AttendanceStatus.PRESENT, AttendanceStatus.LATE]
),
)
.first()
)
if record:
current_streak += 1
else:
break
return AttendanceStats(
total_services=total_services,
services_attended=services_attended,
attendance_percentage=attendance_percentage,
most_attended_service_type=most_attended_service_type,
current_streak=current_streak,
longest_streak=current_streak, # Simplified - would need more complex logic for actual longest
)
# Attendance Goals
@router.post("/goals", response_model=AttendanceGoalResponse)
def create_attendance_goal(
goal: AttendanceGoalCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Create attendance goal"""
# Deactivate existing goals for the same service type
db.query(AttendanceGoal).filter(
AttendanceGoal.user_id == current_user.id,
AttendanceGoal.service_type == goal.service_type,
AttendanceGoal.is_active,
).update({"is_active": False})
db_goal = AttendanceGoal(**goal.dict(), user_id=current_user.id)
db.add(db_goal)
db.commit()
db.refresh(db_goal)
# Calculate current percentage
services_count = (
db.query(Service)
.filter(
Service.service_type == goal.service_type,
Service.service_date >= goal.start_date,
Service.service_date <= min(goal.end_date, date.today()),
)
.count()
)
attended_count = (
db.query(AttendanceRecord)
.join(Service)
.filter(
Service.service_type == goal.service_type,
Service.service_date >= goal.start_date,
Service.service_date <= min(goal.end_date, date.today()),
AttendanceRecord.user_id == current_user.id,
AttendanceRecord.status.in_(
[AttendanceStatus.PRESENT, AttendanceStatus.LATE]
),
)
.count()
)
db_goal.current_percentage = (
(attended_count / services_count * 100) if services_count > 0 else 0
)
return db_goal
@router.get("/goals", response_model=List[AttendanceGoalResponse])
def get_my_attendance_goals(
db: Session = Depends(get_db), current_user: User = Depends(get_current_user)
):
"""Get user's attendance goals"""
goals = (
db.query(AttendanceGoal)
.filter(AttendanceGoal.user_id == current_user.id, AttendanceGoal.is_active)
.all()
)
for goal in goals:
# Calculate current percentage
services_count = (
db.query(Service)
.filter(
Service.service_type == goal.service_type,
Service.service_date >= goal.start_date,
Service.service_date <= min(goal.end_date, date.today()),
)
.count()
)
attended_count = (
db.query(AttendanceRecord)
.join(Service)
.filter(
Service.service_type == goal.service_type,
Service.service_date >= goal.start_date,
Service.service_date <= min(goal.end_date, date.today()),
AttendanceRecord.user_id == current_user.id,
AttendanceRecord.status.in_(
[AttendanceStatus.PRESENT, AttendanceStatus.LATE]
),
)
.count()
)
goal.current_percentage = (
(attended_count / services_count * 100) if services_count > 0 else 0
)
return goals