from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.orm import Session from pydantic import BaseModel from app.db.session import get_db from app.models.user import User from app.models.video import Video from app.models.translation import Translation from app.models.dubbed_audio import DubbedAudio from app.utils.auth import get_current_user from app.services.voice_cloning_service import clone_voice_and_generate_audio router = APIRouter() class DubbedAudioResponse(BaseModel): id: int video_id: int s3_url: str created_at: str class Config: orm_mode = True class VoiceCloningStartResponse(BaseModel): message: str video_id: int async def background_voice_clone(video_id: int, video_s3_url: str, translated_text: str, db: Session): try: # Update video status video = db.query(Video).filter(Video.id == video_id).first() if video: video.status = "voice_cloning" db.commit() # Generate dubbed audio with voice cloning dubbed_audio_url = await clone_voice_and_generate_audio(video_s3_url, translated_text) if dubbed_audio_url: # Save dubbed audio to database dubbed_audio = DubbedAudio( video_id=video_id, s3_url=dubbed_audio_url ) db.add(dubbed_audio) # Update video status if video: video.status = "voice_cloned" db.commit() else: # Update video status to error if video: video.status = "voice_cloning_failed" db.commit() except Exception: # Update video status to error video = db.query(Video).filter(Video.id == video_id).first() if video: video.status = "voice_cloning_failed" db.commit() @router.post("/clone/{video_id}", response_model=VoiceCloningStartResponse) async def start_voice_cloning( video_id: int, background_tasks: BackgroundTasks, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # Check if video exists and belongs to user video = db.query(Video).filter( Video.id == video_id, Video.user_id == current_user.id ).first() if not video: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Video not found" ) # Check if translation exists translation = db.query(Translation).filter( Translation.video_id == video_id ).first() if not translation: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Translation not found. Please translate the video first." ) # Check if dubbed audio already exists existing_dubbed_audio = db.query(DubbedAudio).filter( DubbedAudio.video_id == video_id ).first() if existing_dubbed_audio: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Dubbed audio already exists for this video" ) # Start background voice cloning background_tasks.add_task( background_voice_clone, video_id, video.original_s3_url, translation.text, db ) return VoiceCloningStartResponse( message="Voice cloning started in background", video_id=video_id ) @router.get("/{video_id}", response_model=DubbedAudioResponse) async def get_dubbed_audio( video_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # Check if video exists and belongs to user video = db.query(Video).filter( Video.id == video_id, Video.user_id == current_user.id ).first() if not video: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Video not found" ) # Get dubbed audio dubbed_audio = db.query(DubbedAudio).filter( DubbedAudio.video_id == video_id ).first() if not dubbed_audio: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Dubbed audio not found" ) return DubbedAudioResponse( id=dubbed_audio.id, video_id=dubbed_audio.video_id, s3_url=dubbed_audio.s3_url, created_at=str(dubbed_audio.created_at) )