Automated Action 1754fec627 Create Bible Quiz App API with FastAPI and SQLite
- Set up project structure with FastAPI and SQLite
- Create models for users, questions, and quizzes
- Implement Alembic migrations with seed data
- Add user authentication with JWT
- Implement question management endpoints
- Implement quiz creation and management
- Add quiz-taking and scoring functionality
- Set up API documentation and health check endpoint
- Update README with comprehensive documentation
2025-06-03 15:46:44 +00:00

331 lines
7.6 KiB
Python

from datetime import datetime
from typing import List, Optional, Tuple
from sqlalchemy import func
from sqlalchemy.orm import Session, joinedload
from app.models.quiz import (
Quiz,
QuizAttempt,
QuizQuestion,
QuizQuestionAnswer,
)
from app.schemas.quiz import (
AttemptStatus,
QuizAttemptCreate,
QuizAttemptUpdate,
QuizCreate,
QuizUpdate,
)
def get_by_id(db: Session, quiz_id: int) -> Optional[Quiz]:
"""
Get quiz by ID
"""
return (
db.query(Quiz)
.options(joinedload(Quiz.questions).joinedload(QuizQuestion.question))
.filter(Quiz.id == quiz_id)
.first()
)
def get_multi(
db: Session,
*,
user_id: Optional[int] = None,
is_public: Optional[bool] = None,
skip: int = 0,
limit: int = 100,
) -> List[Quiz]:
"""
Get multiple quizzes with optional filters
"""
query = db.query(Quiz)
if user_id is not None:
query = query.filter(Quiz.user_id == user_id)
if is_public is not None:
query = query.filter(Quiz.is_public == is_public)
return query.offset(skip).limit(limit).all()
def get_public_quizzes(
db: Session,
*,
skip: int = 0,
limit: int = 100,
) -> List[Quiz]:
"""
Get public quizzes
"""
return get_multi(db, is_public=True, skip=skip, limit=limit)
def get_user_quizzes(
db: Session,
*,
user_id: int,
skip: int = 0,
limit: int = 100,
) -> List[Quiz]:
"""
Get quizzes created by a specific user
"""
return get_multi(db, user_id=user_id, skip=skip, limit=limit)
def create(db: Session, *, obj_in: QuizCreate, user_id: int) -> Quiz:
"""
Create new quiz with questions
"""
# Extract questions from input
questions_data = obj_in.questions
obj_in_dict = obj_in.dict(exclude={"questions"})
# Create quiz
db_obj = Quiz(user_id=user_id, **obj_in_dict)
db.add(db_obj)
db.flush() # Flush to get the ID without committing
# Create questions for the quiz
for question_data in questions_data:
question = QuizQuestion(
quiz_id=db_obj.id,
**question_data.dict(),
)
db.add(question)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
db: Session,
*,
db_obj: Quiz,
obj_in: QuizUpdate
) -> Quiz:
"""
Update quiz and questions
"""
# Update quiz fields
update_data = obj_in.dict(exclude={"questions"}, exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
# Update questions if provided
if obj_in.questions is not None:
# Remove existing questions
db.query(QuizQuestion).filter(QuizQuestion.quiz_id == db_obj.id).delete()
# Create new questions
for question_data in obj_in.questions:
question = QuizQuestion(
quiz_id=db_obj.id,
**question_data.dict(),
)
db.add(question)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(db: Session, *, quiz_id: int) -> None:
"""
Delete quiz (and its questions via cascade)
"""
quiz = db.query(Quiz).filter(Quiz.id == quiz_id).first()
if quiz:
db.delete(quiz)
db.commit()
# Quiz attempt functions
def get_attempt_by_id(db: Session, attempt_id: int) -> Optional[QuizAttempt]:
"""
Get quiz attempt by ID
"""
return (
db.query(QuizAttempt)
.options(
joinedload(QuizAttempt.quiz),
joinedload(QuizAttempt.answers)
)
.filter(QuizAttempt.id == attempt_id)
.first()
)
def get_user_attempts(
db: Session,
*,
user_id: int,
quiz_id: Optional[int] = None,
skip: int = 0,
limit: int = 100,
) -> List[QuizAttempt]:
"""
Get quiz attempts by a specific user
"""
query = db.query(QuizAttempt).filter(QuizAttempt.user_id == user_id)
if quiz_id is not None:
query = query.filter(QuizAttempt.quiz_id == quiz_id)
return query.order_by(QuizAttempt.started_at.desc()).offset(skip).limit(limit).all()
def create_attempt(
db: Session,
*,
obj_in: QuizAttemptCreate,
user_id: int
) -> QuizAttempt:
"""
Create a new quiz attempt
"""
db_obj = QuizAttempt(
user_id=user_id,
**obj_in.dict(),
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_attempt(
db: Session,
*,
db_obj: QuizAttempt,
obj_in: QuizAttemptUpdate
) -> QuizAttempt:
"""
Update a quiz attempt
"""
update_data = obj_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def submit_answer(
db: Session,
*,
attempt_id: int,
question_id: int,
selected_option_id: int,
) -> Optional[QuizQuestionAnswer]:
"""
Submit an answer for a quiz question
"""
# Get the quiz question
quiz_question = (
db.query(QuizQuestion)
.join(Quiz)
.join(QuizAttempt)
.filter(
QuizAttempt.id == attempt_id,
QuizQuestion.question_id == question_id,
)
.first()
)
if not quiz_question:
return None
# Check if an answer already exists
existing_answer = (
db.query(QuizQuestionAnswer)
.filter(
QuizQuestionAnswer.quiz_attempt_id == attempt_id,
QuizQuestionAnswer.quiz_question_id == quiz_question.id,
)
.first()
)
if existing_answer:
# Update existing answer
existing_answer.selected_option_id = selected_option_id
existing_answer.answered_at = datetime.utcnow()
db.add(existing_answer)
db.commit()
db.refresh(existing_answer)
return existing_answer
# Check if the selected option belongs to the question
from app.models.question import QuestionOption
option = (
db.query(QuestionOption)
.join(QuizQuestion.question)
.filter(
QuestionOption.id == selected_option_id,
QuizQuestion.id == quiz_question.id,
)
.first()
)
if not option:
return None
# Create new answer
answer = QuizQuestionAnswer(
quiz_attempt_id=attempt_id,
quiz_question_id=quiz_question.id,
selected_option_id=selected_option_id,
is_correct=option.is_correct,
)
db.add(answer)
db.commit()
db.refresh(answer)
return answer
def complete_attempt(db: Session, *, attempt_id: int) -> Tuple[QuizAttempt, float]:
"""
Complete a quiz attempt and calculate score
"""
attempt = get_attempt_by_id(db, attempt_id=attempt_id)
if not attempt:
return None, 0
# Calculate the score
total_questions = (
db.query(func.count(QuizQuestion.id))
.filter(QuizQuestion.quiz_id == attempt.quiz_id)
.scalar()
)
correct_answers = (
db.query(func.count(QuizQuestionAnswer.id))
.filter(
QuizQuestionAnswer.quiz_attempt_id == attempt_id,
QuizQuestionAnswer.is_correct == True, # noqa: E712
)
.scalar()
)
# Calculate score as percentage
score = (correct_answers / total_questions) * 100 if total_questions > 0 else 0
# Update the attempt
attempt.status = AttemptStatus.completed
attempt.score = score
attempt.completed_at = datetime.utcnow()
db.add(attempt)
db.commit()
db.refresh(attempt)
return attempt, score