
Features: - JWT authentication with user registration and login - Video upload to Amazon S3 with file validation (200MB limit) - Audio transcription using OpenAI Whisper API - Text translation using GPT-4 API - Voice cloning and audio synthesis using ElevenLabs API - Video processing with ffmpeg for audio replacement - Complete SQLite database with proper models and migrations - Background task processing for long-running operations - Health endpoint and comprehensive API documentation Tech stack: - FastAPI with SQLAlchemy ORM - SQLite database with Alembic migrations - Amazon S3 for file storage - OpenAI APIs for transcription and translation - ElevenLabs API for voice cloning - ffmpeg for video processing - JWT authentication with bcrypt password hashing
158 lines
4.5 KiB
Python
158 lines
4.5 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
|
|
from sqlalchemy.orm import Session
|
|
from pydantic import BaseModel
|
|
from app.db.session import get_db
|
|
from app.models.user import User
|
|
from app.models.video import Video
|
|
from app.models.translation import Translation
|
|
from app.models.dubbed_audio import DubbedAudio
|
|
from app.utils.auth import get_current_user
|
|
from app.services.voice_cloning_service import clone_voice_and_generate_audio
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class DubbedAudioResponse(BaseModel):
|
|
id: int
|
|
video_id: int
|
|
s3_url: str
|
|
created_at: str
|
|
|
|
class Config:
|
|
orm_mode = True
|
|
|
|
|
|
class VoiceCloningStartResponse(BaseModel):
|
|
message: str
|
|
video_id: int
|
|
|
|
|
|
async def background_voice_clone(video_id: int, video_s3_url: str, translated_text: str, db: Session):
|
|
try:
|
|
# Update video status
|
|
video = db.query(Video).filter(Video.id == video_id).first()
|
|
if video:
|
|
video.status = "voice_cloning"
|
|
db.commit()
|
|
|
|
# Generate dubbed audio with voice cloning
|
|
dubbed_audio_url = await clone_voice_and_generate_audio(video_s3_url, translated_text)
|
|
|
|
if dubbed_audio_url:
|
|
# Save dubbed audio to database
|
|
dubbed_audio = DubbedAudio(
|
|
video_id=video_id,
|
|
s3_url=dubbed_audio_url
|
|
)
|
|
db.add(dubbed_audio)
|
|
|
|
# Update video status
|
|
if video:
|
|
video.status = "voice_cloned"
|
|
|
|
db.commit()
|
|
else:
|
|
# Update video status to error
|
|
if video:
|
|
video.status = "voice_cloning_failed"
|
|
db.commit()
|
|
|
|
except Exception:
|
|
# Update video status to error
|
|
video = db.query(Video).filter(Video.id == video_id).first()
|
|
if video:
|
|
video.status = "voice_cloning_failed"
|
|
db.commit()
|
|
|
|
|
|
@router.post("/clone/{video_id}", response_model=VoiceCloningStartResponse)
|
|
async def start_voice_cloning(
|
|
video_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
# Check if video exists and belongs to user
|
|
video = db.query(Video).filter(
|
|
Video.id == video_id,
|
|
Video.user_id == current_user.id
|
|
).first()
|
|
|
|
if not video:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Video not found"
|
|
)
|
|
|
|
# Check if translation exists
|
|
translation = db.query(Translation).filter(
|
|
Translation.video_id == video_id
|
|
).first()
|
|
|
|
if not translation:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Translation not found. Please translate the video first."
|
|
)
|
|
|
|
# Check if dubbed audio already exists
|
|
existing_dubbed_audio = db.query(DubbedAudio).filter(
|
|
DubbedAudio.video_id == video_id
|
|
).first()
|
|
|
|
if existing_dubbed_audio:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Dubbed audio already exists for this video"
|
|
)
|
|
|
|
# Start background voice cloning
|
|
background_tasks.add_task(
|
|
background_voice_clone,
|
|
video_id,
|
|
video.original_s3_url,
|
|
translation.text,
|
|
db
|
|
)
|
|
|
|
return VoiceCloningStartResponse(
|
|
message="Voice cloning started in background",
|
|
video_id=video_id
|
|
)
|
|
|
|
|
|
@router.get("/{video_id}", response_model=DubbedAudioResponse)
|
|
async def get_dubbed_audio(
|
|
video_id: int,
|
|
current_user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
# Check if video exists and belongs to user
|
|
video = db.query(Video).filter(
|
|
Video.id == video_id,
|
|
Video.user_id == current_user.id
|
|
).first()
|
|
|
|
if not video:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Video not found"
|
|
)
|
|
|
|
# Get dubbed audio
|
|
dubbed_audio = db.query(DubbedAudio).filter(
|
|
DubbedAudio.video_id == video_id
|
|
).first()
|
|
|
|
if not dubbed_audio:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Dubbed audio not found"
|
|
)
|
|
|
|
return DubbedAudioResponse(
|
|
id=dubbed_audio.id,
|
|
video_id=dubbed_audio.video_id,
|
|
s3_url=dubbed_audio.s3_url,
|
|
created_at=str(dubbed_audio.created_at)
|
|
) |