from __future__ import annotations from datetime import datetime from typing import Any, Dict, List, Optional from sqlalchemy import and_, func from sqlalchemy.orm import Session from app.models.content import Answer, Question from app.models.progress import ProgressStatus, UserAnswer, UserProgress from app.models.user import User from app.schemas.progress import UserAnswerCreate, UserProgressCreate, UserProgressUpdate from app.services.user import user as user_service from app.utils.db import CRUDBase class CRUDUserProgress(CRUDBase[UserProgress, UserProgressCreate, UserProgressUpdate]): def get_by_user_lesson( self, db: Session, *, user_id: int, lesson_id: int ) -> Optional[UserProgress]: """ Get a user's progress for a specific lesson. """ return ( db.query(UserProgress) .filter(UserProgress.user_id == user_id, UserProgress.lesson_id == lesson_id) .first() ) def get_by_user( self, db: Session, *, user_id: int, skip: int = 0, limit: int = 100 ) -> List[UserProgress]: """ Get a user's progress for all lessons. """ return ( db.query(UserProgress) .filter(UserProgress.user_id == user_id) .offset(skip) .limit(limit) .all() ) def get_by_user_status( self, db: Session, *, user_id: int, status: ProgressStatus, skip: int = 0, limit: int = 100 ) -> List[UserProgress]: """ Get a user's progress filtered by status. """ return ( db.query(UserProgress) .filter(UserProgress.user_id == user_id, UserProgress.status == status) .offset(skip) .limit(limit) .all() ) def update_progress( self, db: Session, *, user_id: int, lesson_id: int, progress_percentage: float ) -> UserProgress: """ Update a user's progress for a lesson. """ db_obj = self.get_by_user_lesson(db, user_id=user_id, lesson_id=lesson_id) if not db_obj: # Create new progress record db_obj = UserProgress( user_id=user_id, lesson_id=lesson_id, status=ProgressStatus.IN_PROGRESS if progress_percentage < 100 else ProgressStatus.COMPLETED, progress_percentage=progress_percentage, completed_at=datetime.utcnow() if progress_percentage >= 100 else None, ) db.add(db_obj) else: # Update existing progress record db_obj.progress_percentage = progress_percentage # Update status based on progress if progress_percentage >= 100 and db_obj.status != ProgressStatus.COMPLETED: db_obj.status = ProgressStatus.COMPLETED db_obj.completed_at = datetime.utcnow() elif progress_percentage > 0 and db_obj.status == ProgressStatus.NOT_STARTED: db_obj.status = ProgressStatus.IN_PROGRESS db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def complete_lesson( self, db: Session, *, user_id: int, lesson_id: int, points_earned: int = None ) -> UserProgress: """ Mark a lesson as completed and award points to the user. """ db_obj = self.get_by_user_lesson(db, user_id=user_id, lesson_id=lesson_id) if not db_obj: # Create new completed progress record db_obj = UserProgress( user_id=user_id, lesson_id=lesson_id, status=ProgressStatus.COMPLETED, progress_percentage=100.0, completed_at=datetime.utcnow(), points_earned=points_earned or 0, ) db.add(db_obj) else: # Update existing progress record to completed db_obj.status = ProgressStatus.COMPLETED db_obj.progress_percentage = 100.0 db_obj.completed_at = datetime.utcnow() if points_earned is not None: db_obj.points_earned = points_earned db.add(db_obj) # Award points to user if specified if points_earned: user_service.add_points(db, user_id=user_id, points=points_earned) db.commit() db.refresh(db_obj) return db_obj def get_user_progress_stats(self, db: Session, *, user_id: int) -> Dict[str, Any]: """ Get statistics about a user's progress. """ # Get counts of lessons by status completed_count = ( db.query(func.count(UserProgress.id)) .filter( UserProgress.user_id == user_id, UserProgress.status == ProgressStatus.COMPLETED ) .scalar() or 0 ) in_progress_count = ( db.query(func.count(UserProgress.id)) .filter( UserProgress.user_id == user_id, UserProgress.status == ProgressStatus.IN_PROGRESS ) .scalar() or 0 ) # Get total points earned from lessons total_points = ( db.query(func.sum(UserProgress.points_earned)) .filter(UserProgress.user_id == user_id) .scalar() or 0 ) # Get user level and overall points user = db.query(User).filter(User.id == user_id).first() return { "completed_lessons": completed_count, "in_progress_lessons": in_progress_count, "total_lessons_points": total_points, "user_level": user.level if user else 1, "user_points": user.points if user else 0, } class CRUDUserAnswer(CRUDBase[UserAnswer, UserAnswerCreate, UserAnswerCreate]): def get_by_user_question_attempt( self, db: Session, *, user_id: int, question_id: int, attempt_number: int ) -> Optional[UserAnswer]: """ Get a user's answer for a specific question attempt. """ return ( db.query(UserAnswer) .filter( UserAnswer.user_id == user_id, UserAnswer.question_id == question_id, UserAnswer.attempt_number == attempt_number, ) .first() ) def get_by_user_question( self, db: Session, *, user_id: int, question_id: int ) -> List[UserAnswer]: """ Get all of a user's answers for a specific question. """ return ( db.query(UserAnswer) .filter(UserAnswer.user_id == user_id, UserAnswer.question_id == question_id) .order_by(UserAnswer.attempt_number) .all() ) def get_by_quiz(self, db: Session, *, user_id: int, quiz_id: int) -> List[UserAnswer]: """ Get a user's answers for all questions in a quiz. """ return ( db.query(UserAnswer) .join(Question, UserAnswer.question_id == Question.id) .filter(UserAnswer.user_id == user_id, Question.quiz_id == quiz_id) .all() ) def submit_answer( self, db: Session, *, user_id: int, question_id: int, answer_id: int, time_taken_seconds: Optional[int] = None, ) -> UserAnswer: """ Submit a user's answer to a question. """ # Get the answer to check correctness answer = db.query(Answer).filter(Answer.id == answer_id).first() if not answer or answer.question_id != question_id: raise ValueError("Invalid answer for the question") # Calculate attempt number (previous attempts + 1) attempt_count = ( db.query(func.count(UserAnswer.id)) .filter(UserAnswer.user_id == user_id, UserAnswer.question_id == question_id) .scalar() or 0 ) attempt_number = attempt_count + 1 # Calculate points based on correctness and attempt number # First attempt correct: full points, subsequent attempts: reduced points question = db.query(Question).filter(Question.id == question_id).first() points_earned = 0 if answer.is_correct: if attempt_number == 1: # Full points for correct answer on first try points_earned = question.points else: # Reduced points for correct answers on subsequent attempts # Formula: points * (0.5)^(attempt_number-1) # This gives 50% for 2nd attempt, 25% for 3rd, etc. points_earned = int(question.points * (0.5 ** (attempt_number - 1))) # Create the user answer record user_answer = UserAnswer( user_id=user_id, question_id=question_id, answer_id=answer_id, is_correct=answer.is_correct, points_earned=points_earned, attempt_number=attempt_number, time_taken_seconds=time_taken_seconds, ) db.add(user_answer) # Award points to user if answer is correct if points_earned > 0: user_service.add_points(db, user_id=user_id, points=points_earned) db.commit() db.refresh(user_answer) return user_answer def get_quiz_results(self, db: Session, *, user_id: int, quiz_id: int) -> Dict[str, Any]: """ Get a summary of a user's results for a quiz. """ # Get all questions for this quiz questions = db.query(Question).filter(Question.quiz_id == quiz_id).all() question_ids = [q.id for q in questions] if not question_ids: return { "total_questions": 0, "questions_answered": 0, "correct_answers": 0, "score_percentage": 0, "points_earned": 0, "passed": False, } # Get the user's most recent answers for each question subquery = ( db.query( UserAnswer.question_id, func.max(UserAnswer.attempt_number).label("max_attempt") ) .filter(UserAnswer.user_id == user_id, UserAnswer.question_id.in_(question_ids)) .group_by(UserAnswer.question_id) .subquery() ) latest_answers = ( db.query(UserAnswer) .join( subquery, and_( UserAnswer.question_id == subquery.c.question_id, UserAnswer.attempt_number == subquery.c.max_attempt, ), ) .filter(UserAnswer.user_id == user_id) .all() ) # Calculate statistics questions_answered = len(latest_answers) correct_answers = sum(1 for a in latest_answers if a.is_correct) points_earned = sum(a.points_earned for a in latest_answers) # Calculate score percentage score_percentage = 0 if questions_answered > 0: score_percentage = round((correct_answers / len(questions)) * 100) # Get quiz pass percentage quiz = db.query(func.min(Question.quiz_id)).filter(Question.id.in_(question_ids)).scalar() quiz_pass_percentage = 70 # Default pass percentage # Check if user passed the quiz passed = score_percentage >= quiz_pass_percentage return { "total_questions": len(questions), "questions_answered": questions_answered, "correct_answers": correct_answers, "score_percentage": score_percentage, "points_earned": points_earned, "passed": passed, } user_progress = CRUDUserProgress(UserProgress) user_answer = CRUDUserAnswer(UserAnswer)