
Features: - JWT authentication with user registration and login - Video upload to Amazon S3 with file validation (200MB limit) - Audio transcription using OpenAI Whisper API - Text translation using GPT-4 API - Voice cloning and audio synthesis using ElevenLabs API - Video processing with ffmpeg for audio replacement - Complete SQLite database with proper models and migrations - Background task processing for long-running operations - Health endpoint and comprehensive API documentation Tech stack: - FastAPI with SQLAlchemy ORM - SQLite database with Alembic migrations - Amazon S3 for file storage - OpenAI APIs for transcription and translation - ElevenLabs API for voice cloning - ffmpeg for video processing - JWT authentication with bcrypt password hashing
139 lines
4.4 KiB
Python
139 lines
4.4 KiB
Python
import os
|
|
import requests
|
|
from typing import Optional
|
|
import logging
|
|
import tempfile
|
|
import uuid
|
|
from app.services.s3_service import upload_file_to_s3, download_file_from_s3
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")
|
|
ELEVENLABS_BASE_URL = "https://api.elevenlabs.io/v1"
|
|
|
|
if not ELEVENLABS_API_KEY:
|
|
logger.warning("ElevenLabs API key not configured")
|
|
|
|
|
|
async def clone_voice_and_generate_audio(video_s3_url: str, text: str, voice_id: Optional[str] = None) -> Optional[str]:
|
|
if not ELEVENLABS_API_KEY:
|
|
logger.error("ElevenLabs API key not configured")
|
|
return None
|
|
|
|
try:
|
|
headers = {
|
|
"Accept": "audio/mpeg",
|
|
"Content-Type": "application/json",
|
|
"xi-api-key": ELEVENLABS_API_KEY
|
|
}
|
|
|
|
# If no voice_id provided, use default voice or create one from video
|
|
if not voice_id:
|
|
voice_id = await create_voice_from_video(video_s3_url)
|
|
|
|
if not voice_id:
|
|
# Fallback to a default voice if voice cloning fails
|
|
voice_id = "21m00Tcm4TlvDq8ikWAM" # Default ElevenLabs voice
|
|
logger.warning("Using default voice as voice cloning failed")
|
|
|
|
# Generate speech with the voice
|
|
data = {
|
|
"text": text,
|
|
"model_id": "eleven_monolingual_v1",
|
|
"voice_settings": {
|
|
"stability": 0.5,
|
|
"similarity_boost": 0.5
|
|
}
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{ELEVENLABS_BASE_URL}/text-to-speech/{voice_id}",
|
|
json=data,
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
# Generate unique filename for the audio
|
|
audio_filename = f"dubbed_audio/{uuid.uuid4()}.mp3"
|
|
|
|
# Upload audio to S3
|
|
s3_url = await upload_file_to_s3(
|
|
response.content,
|
|
audio_filename,
|
|
"audio/mpeg"
|
|
)
|
|
|
|
return s3_url
|
|
else:
|
|
logger.error(f"ElevenLabs API error: {response.status_code} - {response.text}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating dubbed audio: {e}")
|
|
return None
|
|
|
|
|
|
async def create_voice_from_video(video_s3_url: str) -> Optional[str]:
|
|
if not ELEVENLABS_API_KEY:
|
|
logger.error("ElevenLabs API key not configured")
|
|
return None
|
|
|
|
try:
|
|
# Extract filename from S3 URL
|
|
file_name = video_s3_url.split('/')[-1]
|
|
|
|
# Download video from S3
|
|
video_content = await download_file_from_s3(file_name)
|
|
if not video_content:
|
|
logger.error("Failed to download video from S3 for voice cloning")
|
|
return None
|
|
|
|
# Create temporary file for the video
|
|
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
|
|
temp_file.write(video_content)
|
|
temp_file_path = temp_file.name
|
|
|
|
try:
|
|
# Headers for voice cloning
|
|
headers = {
|
|
"xi-api-key": ELEVENLABS_API_KEY
|
|
}
|
|
|
|
# Voice cloning payload
|
|
data = {
|
|
"name": f"cloned_voice_{uuid.uuid4()}",
|
|
"description": "Voice cloned from uploaded video"
|
|
}
|
|
|
|
# Upload file for voice cloning
|
|
with open(temp_file_path, 'rb') as f:
|
|
files = {
|
|
"files": f
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{ELEVENLABS_BASE_URL}/voices/add",
|
|
headers=headers,
|
|
data=data,
|
|
files=files
|
|
)
|
|
|
|
# Clean up temporary file
|
|
os.unlink(temp_file_path)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
return result.get("voice_id")
|
|
else:
|
|
logger.error(f"Voice cloning failed: {response.status_code} - {response.text}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
# Clean up temporary file on error
|
|
if os.path.exists(temp_file_path):
|
|
os.unlink(temp_file_path)
|
|
raise e
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cloning voice: {e}")
|
|
return None |