
Features: - JWT authentication with user registration and login - Video upload to Amazon S3 with file validation (200MB limit) - Audio transcription using OpenAI Whisper API - Text translation using GPT-4 API - Voice cloning and audio synthesis using ElevenLabs API - Video processing with ffmpeg for audio replacement - Complete SQLite database with proper models and migrations - Background task processing for long-running operations - Health endpoint and comprehensive API documentation Tech stack: - FastAPI with SQLAlchemy ORM - SQLite database with Alembic migrations - Amazon S3 for file storage - OpenAI APIs for transcription and translation - ElevenLabs API for voice cloning - ffmpeg for video processing - JWT authentication with bcrypt password hashing
116 lines
3.9 KiB
Python
116 lines
3.9 KiB
Python
import os
|
|
import ffmpeg
|
|
from typing import Optional
|
|
import logging
|
|
import tempfile
|
|
import uuid
|
|
from app.services.s3_service import upload_file_to_s3, download_file_from_s3
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def process_video_with_dubbed_audio(video_s3_url: str, dubbed_audio_s3_url: str) -> Optional[str]:
|
|
try:
|
|
# Extract filenames from S3 URLs
|
|
video_filename = video_s3_url.split('/')[-1]
|
|
audio_filename = dubbed_audio_s3_url.split('/')[-1]
|
|
|
|
# Download video and audio from S3
|
|
video_content = await download_file_from_s3(video_filename)
|
|
audio_content = await download_file_from_s3(audio_filename)
|
|
|
|
if not video_content or not audio_content:
|
|
logger.error("Failed to download video or audio from S3")
|
|
return None
|
|
|
|
# Create temporary files
|
|
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as video_temp:
|
|
video_temp.write(video_content)
|
|
video_temp_path = video_temp.name
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as audio_temp:
|
|
audio_temp.write(audio_content)
|
|
audio_temp_path = audio_temp.name
|
|
|
|
# Create output temporary file
|
|
output_temp_path = tempfile.mktemp(suffix='.mp4')
|
|
|
|
try:
|
|
# Use ffmpeg to replace audio in video
|
|
input_video = ffmpeg.input(video_temp_path)
|
|
input_audio = ffmpeg.input(audio_temp_path)
|
|
|
|
# Combine video and new audio
|
|
out = ffmpeg.output(
|
|
input_video['v'], # video stream
|
|
input_audio['a'], # audio stream
|
|
output_temp_path,
|
|
vcodec='copy', # copy video codec (faster)
|
|
acodec='aac', # encode audio as AAC
|
|
strict='experimental'
|
|
)
|
|
|
|
# Run ffmpeg command
|
|
ffmpeg.run(out, overwrite_output=True, quiet=True)
|
|
|
|
# Read the processed video
|
|
with open(output_temp_path, 'rb') as f:
|
|
processed_video_content = f.read()
|
|
|
|
# Generate unique filename for processed video
|
|
processed_video_filename = f"processed_videos/{uuid.uuid4()}.mp4"
|
|
|
|
# Upload processed video to S3
|
|
s3_url = await upload_file_to_s3(
|
|
processed_video_content,
|
|
processed_video_filename,
|
|
"video/mp4"
|
|
)
|
|
|
|
# Clean up temporary files
|
|
os.unlink(video_temp_path)
|
|
os.unlink(audio_temp_path)
|
|
os.unlink(output_temp_path)
|
|
|
|
return s3_url
|
|
|
|
except Exception as e:
|
|
# Clean up temporary files on error
|
|
for temp_path in [video_temp_path, audio_temp_path, output_temp_path]:
|
|
if os.path.exists(temp_path):
|
|
os.unlink(temp_path)
|
|
raise e
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing video: {e}")
|
|
return None
|
|
|
|
|
|
def extract_audio_from_video(video_path: str, audio_output_path: str) -> bool:
|
|
try:
|
|
(
|
|
ffmpeg
|
|
.input(video_path)
|
|
.output(audio_output_path, acodec='mp3')
|
|
.overwrite_output()
|
|
.run(quiet=True)
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error extracting audio: {e}")
|
|
return False
|
|
|
|
|
|
def mute_video_audio(video_path: str, output_path: str) -> bool:
|
|
try:
|
|
(
|
|
ffmpeg
|
|
.input(video_path)
|
|
.output(output_path, vcodec='copy', an=None) # an=None removes audio
|
|
.overwrite_output()
|
|
.run(quiet=True)
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error muting video: {e}")
|
|
return False |