Automated Action 53d9909fb6 Create gamified kids learning API with FastAPI and SQLite
- Set up project structure with FastAPI and SQLite
- Implement user authentication with JWT
- Create models for learning content (subjects, lessons, quizzes)
- Add progress tracking and gamification features
- Implement comprehensive API documentation
- Add error handling and validation
- Set up proper logging and health check endpoint
2025-06-17 18:25:16 +00:00

348 lines
12 KiB
Python

from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from app.models.content import Answer, Question
from app.models.progress import ProgressStatus, UserAnswer, UserProgress
from app.models.user import User
from app.schemas.progress import UserAnswerCreate, UserProgressCreate, UserProgressUpdate
from app.services.user import user as user_service
from app.utils.db import CRUDBase
class CRUDUserProgress(CRUDBase[UserProgress, UserProgressCreate, UserProgressUpdate]):
def get_by_user_lesson(
self, db: Session, *, user_id: int, lesson_id: int
) -> Optional[UserProgress]:
"""
Get a user's progress for a specific lesson.
"""
return (
db.query(UserProgress)
.filter(UserProgress.user_id == user_id, UserProgress.lesson_id == lesson_id)
.first()
)
def get_by_user(
self, db: Session, *, user_id: int, skip: int = 0, limit: int = 100
) -> List[UserProgress]:
"""
Get a user's progress for all lessons.
"""
return (
db.query(UserProgress)
.filter(UserProgress.user_id == user_id)
.offset(skip)
.limit(limit)
.all()
)
def get_by_user_status(
self, db: Session, *, user_id: int, status: ProgressStatus, skip: int = 0, limit: int = 100
) -> List[UserProgress]:
"""
Get a user's progress filtered by status.
"""
return (
db.query(UserProgress)
.filter(UserProgress.user_id == user_id, UserProgress.status == status)
.offset(skip)
.limit(limit)
.all()
)
def update_progress(
self, db: Session, *, user_id: int, lesson_id: int, progress_percentage: float
) -> UserProgress:
"""
Update a user's progress for a lesson.
"""
db_obj = self.get_by_user_lesson(db, user_id=user_id, lesson_id=lesson_id)
if not db_obj:
# Create new progress record
db_obj = UserProgress(
user_id=user_id,
lesson_id=lesson_id,
status=ProgressStatus.IN_PROGRESS
if progress_percentage < 100
else ProgressStatus.COMPLETED,
progress_percentage=progress_percentage,
completed_at=datetime.utcnow() if progress_percentage >= 100 else None,
)
db.add(db_obj)
else:
# Update existing progress record
db_obj.progress_percentage = progress_percentage
# Update status based on progress
if progress_percentage >= 100 and db_obj.status != ProgressStatus.COMPLETED:
db_obj.status = ProgressStatus.COMPLETED
db_obj.completed_at = datetime.utcnow()
elif progress_percentage > 0 and db_obj.status == ProgressStatus.NOT_STARTED:
db_obj.status = ProgressStatus.IN_PROGRESS
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def complete_lesson(
self, db: Session, *, user_id: int, lesson_id: int, points_earned: int = None
) -> UserProgress:
"""
Mark a lesson as completed and award points to the user.
"""
db_obj = self.get_by_user_lesson(db, user_id=user_id, lesson_id=lesson_id)
if not db_obj:
# Create new completed progress record
db_obj = UserProgress(
user_id=user_id,
lesson_id=lesson_id,
status=ProgressStatus.COMPLETED,
progress_percentage=100.0,
completed_at=datetime.utcnow(),
points_earned=points_earned or 0,
)
db.add(db_obj)
else:
# Update existing progress record to completed
db_obj.status = ProgressStatus.COMPLETED
db_obj.progress_percentage = 100.0
db_obj.completed_at = datetime.utcnow()
if points_earned is not None:
db_obj.points_earned = points_earned
db.add(db_obj)
# Award points to user if specified
if points_earned:
user_service.add_points(db, user_id=user_id, points=points_earned)
db.commit()
db.refresh(db_obj)
return db_obj
def get_user_progress_stats(self, db: Session, *, user_id: int) -> Dict[str, Any]:
"""
Get statistics about a user's progress.
"""
# Get counts of lessons by status
completed_count = (
db.query(func.count(UserProgress.id))
.filter(
UserProgress.user_id == user_id, UserProgress.status == ProgressStatus.COMPLETED
)
.scalar()
or 0
)
in_progress_count = (
db.query(func.count(UserProgress.id))
.filter(
UserProgress.user_id == user_id, UserProgress.status == ProgressStatus.IN_PROGRESS
)
.scalar()
or 0
)
# Get total points earned from lessons
total_points = (
db.query(func.sum(UserProgress.points_earned))
.filter(UserProgress.user_id == user_id)
.scalar()
or 0
)
# Get user level and overall points
user = db.query(User).filter(User.id == user_id).first()
return {
"completed_lessons": completed_count,
"in_progress_lessons": in_progress_count,
"total_lessons_points": total_points,
"user_level": user.level if user else 1,
"user_points": user.points if user else 0,
}
class CRUDUserAnswer(CRUDBase[UserAnswer, UserAnswerCreate, UserAnswerCreate]):
def get_by_user_question_attempt(
self, db: Session, *, user_id: int, question_id: int, attempt_number: int
) -> Optional[UserAnswer]:
"""
Get a user's answer for a specific question attempt.
"""
return (
db.query(UserAnswer)
.filter(
UserAnswer.user_id == user_id,
UserAnswer.question_id == question_id,
UserAnswer.attempt_number == attempt_number,
)
.first()
)
def get_by_user_question(
self, db: Session, *, user_id: int, question_id: int
) -> List[UserAnswer]:
"""
Get all of a user's answers for a specific question.
"""
return (
db.query(UserAnswer)
.filter(UserAnswer.user_id == user_id, UserAnswer.question_id == question_id)
.order_by(UserAnswer.attempt_number)
.all()
)
def get_by_quiz(self, db: Session, *, user_id: int, quiz_id: int) -> List[UserAnswer]:
"""
Get a user's answers for all questions in a quiz.
"""
return (
db.query(UserAnswer)
.join(Question, UserAnswer.question_id == Question.id)
.filter(UserAnswer.user_id == user_id, Question.quiz_id == quiz_id)
.all()
)
def submit_answer(
self,
db: Session,
*,
user_id: int,
question_id: int,
answer_id: int,
time_taken_seconds: Optional[int] = None,
) -> UserAnswer:
"""
Submit a user's answer to a question.
"""
# Get the answer to check correctness
answer = db.query(Answer).filter(Answer.id == answer_id).first()
if not answer or answer.question_id != question_id:
raise ValueError("Invalid answer for the question")
# Calculate attempt number (previous attempts + 1)
attempt_count = (
db.query(func.count(UserAnswer.id))
.filter(UserAnswer.user_id == user_id, UserAnswer.question_id == question_id)
.scalar()
or 0
)
attempt_number = attempt_count + 1
# Calculate points based on correctness and attempt number
# First attempt correct: full points, subsequent attempts: reduced points
question = db.query(Question).filter(Question.id == question_id).first()
points_earned = 0
if answer.is_correct:
if attempt_number == 1:
# Full points for correct answer on first try
points_earned = question.points
else:
# Reduced points for correct answers on subsequent attempts
# Formula: points * (0.5)^(attempt_number-1)
# This gives 50% for 2nd attempt, 25% for 3rd, etc.
points_earned = int(question.points * (0.5 ** (attempt_number - 1)))
# Create the user answer record
user_answer = UserAnswer(
user_id=user_id,
question_id=question_id,
answer_id=answer_id,
is_correct=answer.is_correct,
points_earned=points_earned,
attempt_number=attempt_number,
time_taken_seconds=time_taken_seconds,
)
db.add(user_answer)
# Award points to user if answer is correct
if points_earned > 0:
user_service.add_points(db, user_id=user_id, points=points_earned)
db.commit()
db.refresh(user_answer)
return user_answer
def get_quiz_results(self, db: Session, *, user_id: int, quiz_id: int) -> Dict[str, Any]:
"""
Get a summary of a user's results for a quiz.
"""
# Get all questions for this quiz
questions = db.query(Question).filter(Question.quiz_id == quiz_id).all()
question_ids = [q.id for q in questions]
if not question_ids:
return {
"total_questions": 0,
"questions_answered": 0,
"correct_answers": 0,
"score_percentage": 0,
"points_earned": 0,
"passed": False,
}
# Get the user's most recent answers for each question
subquery = (
db.query(
UserAnswer.question_id, func.max(UserAnswer.attempt_number).label("max_attempt")
)
.filter(UserAnswer.user_id == user_id, UserAnswer.question_id.in_(question_ids))
.group_by(UserAnswer.question_id)
.subquery()
)
latest_answers = (
db.query(UserAnswer)
.join(
subquery,
and_(
UserAnswer.question_id == subquery.c.question_id,
UserAnswer.attempt_number == subquery.c.max_attempt,
),
)
.filter(UserAnswer.user_id == user_id)
.all()
)
# Calculate statistics
questions_answered = len(latest_answers)
correct_answers = sum(1 for a in latest_answers if a.is_correct)
points_earned = sum(a.points_earned for a in latest_answers)
# Calculate score percentage
score_percentage = 0
if questions_answered > 0:
score_percentage = round((correct_answers / len(questions)) * 100)
# Get quiz pass percentage
quiz = db.query(func.min(Question.quiz_id)).filter(Question.id.in_(question_ids)).scalar()
quiz_pass_percentage = 70 # Default pass percentage
# Check if user passed the quiz
passed = score_percentage >= quiz_pass_percentage
return {
"total_questions": len(questions),
"questions_answered": questions_answered,
"correct_answers": correct_answers,
"score_percentage": score_percentage,
"points_earned": points_earned,
"passed": passed,
}
user_progress = CRUDUserProgress(UserProgress)
user_answer = CRUDUserAnswer(UserAnswer)