Automated Action 92e4d992b2 Implement complete AI video dubbing backend with FastAPI
Features:
- JWT authentication with user registration and login
- Video upload to Amazon S3 with file validation (200MB limit)
- Audio transcription using OpenAI Whisper API
- Text translation using GPT-4 API
- Voice cloning and audio synthesis using ElevenLabs API
- Video processing with ffmpeg for audio replacement
- Complete SQLite database with proper models and migrations
- Background task processing for long-running operations
- Health endpoint and comprehensive API documentation

Tech stack:
- FastAPI with SQLAlchemy ORM
- SQLite database with Alembic migrations
- Amazon S3 for file storage
- OpenAI APIs for transcription and translation
- ElevenLabs API for voice cloning
- ffmpeg for video processing
- JWT authentication with bcrypt password hashing
2025-06-24 17:56:12 +00:00

158 lines
4.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.db.session import get_db
from app.models.user import User
from app.models.video import Video
from app.models.translation import Translation
from app.models.dubbed_audio import DubbedAudio
from app.utils.auth import get_current_user
from app.services.voice_cloning_service import clone_voice_and_generate_audio
router = APIRouter()
class DubbedAudioResponse(BaseModel):
id: int
video_id: int
s3_url: str
created_at: str
class Config:
orm_mode = True
class VoiceCloningStartResponse(BaseModel):
message: str
video_id: int
async def background_voice_clone(video_id: int, video_s3_url: str, translated_text: str, db: Session):
try:
# Update video status
video = db.query(Video).filter(Video.id == video_id).first()
if video:
video.status = "voice_cloning"
db.commit()
# Generate dubbed audio with voice cloning
dubbed_audio_url = await clone_voice_and_generate_audio(video_s3_url, translated_text)
if dubbed_audio_url:
# Save dubbed audio to database
dubbed_audio = DubbedAudio(
video_id=video_id,
s3_url=dubbed_audio_url
)
db.add(dubbed_audio)
# Update video status
if video:
video.status = "voice_cloned"
db.commit()
else:
# Update video status to error
if video:
video.status = "voice_cloning_failed"
db.commit()
except Exception:
# Update video status to error
video = db.query(Video).filter(Video.id == video_id).first()
if video:
video.status = "voice_cloning_failed"
db.commit()
@router.post("/clone/{video_id}", response_model=VoiceCloningStartResponse)
async def start_voice_cloning(
video_id: int,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Check if video exists and belongs to user
video = db.query(Video).filter(
Video.id == video_id,
Video.user_id == current_user.id
).first()
if not video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Video not found"
)
# Check if translation exists
translation = db.query(Translation).filter(
Translation.video_id == video_id
).first()
if not translation:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Translation not found. Please translate the video first."
)
# Check if dubbed audio already exists
existing_dubbed_audio = db.query(DubbedAudio).filter(
DubbedAudio.video_id == video_id
).first()
if existing_dubbed_audio:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Dubbed audio already exists for this video"
)
# Start background voice cloning
background_tasks.add_task(
background_voice_clone,
video_id,
video.original_s3_url,
translation.text,
db
)
return VoiceCloningStartResponse(
message="Voice cloning started in background",
video_id=video_id
)
@router.get("/{video_id}", response_model=DubbedAudioResponse)
async def get_dubbed_audio(
video_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Check if video exists and belongs to user
video = db.query(Video).filter(
Video.id == video_id,
Video.user_id == current_user.id
).first()
if not video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Video not found"
)
# Get dubbed audio
dubbed_audio = db.query(DubbedAudio).filter(
DubbedAudio.video_id == video_id
).first()
if not dubbed_audio:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Dubbed audio not found"
)
return DubbedAudioResponse(
id=dubbed_audio.id,
video_id=dubbed_audio.video_id,
s3_url=dubbed_audio.s3_url,
created_at=str(dubbed_audio.created_at)
)