aivideodubbingapi-r08gi1/app/routes/video_processing.py
Automated Action 92e4d992b2 Implement complete AI video dubbing backend with FastAPI
Features:
- JWT authentication with user registration and login
- Video upload to Amazon S3 with file validation (200MB limit)
- Audio transcription using OpenAI Whisper API
- Text translation using GPT-4 API
- Voice cloning and audio synthesis using ElevenLabs API
- Video processing with ffmpeg for audio replacement
- Complete SQLite database with proper models and migrations
- Background task processing for long-running operations
- Health endpoint and comprehensive API documentation

Tech stack:
- FastAPI with SQLAlchemy ORM
- SQLite database with Alembic migrations
- Amazon S3 for file storage
- OpenAI APIs for transcription and translation
- ElevenLabs API for voice cloning
- ffmpeg for video processing
- JWT authentication with bcrypt password hashing
2025-06-24 17:56:12 +00:00

206 lines
6.2 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.db.session import get_db
from app.models.user import User
from app.models.video import Video
from app.models.dubbed_audio import DubbedAudio
from app.models.dubbed_video import DubbedVideo
from app.utils.auth import get_current_user
from app.services.video_processing_service import process_video_with_dubbed_audio
router = APIRouter()
class DubbedVideoResponse(BaseModel):
id: int
video_id: int
s3_url: str
created_at: str
class Config:
orm_mode = True
class VideoProcessingStartResponse(BaseModel):
message: str
video_id: int
class VideoResultsResponse(BaseModel):
video_id: int
original_video_url: str
transcript: str
translated_text: str
dubbed_audio_url: str
final_dubbed_video_url: str
processing_status: str
async def background_process_video(video_id: int, video_s3_url: str, dubbed_audio_s3_url: str, db: Session):
try:
# Update video status
video = db.query(Video).filter(Video.id == video_id).first()
if video:
video.status = "processing_video"
db.commit()
# Process video with dubbed audio
processed_video_url = await process_video_with_dubbed_audio(video_s3_url, dubbed_audio_s3_url)
if processed_video_url:
# Save processed video to database
dubbed_video = DubbedVideo(
video_id=video_id,
s3_url=processed_video_url
)
db.add(dubbed_video)
# Update video status
if video:
video.status = "completed"
db.commit()
else:
# Update video status to error
if video:
video.status = "video_processing_failed"
db.commit()
except Exception:
# Update video status to error
video = db.query(Video).filter(Video.id == video_id).first()
if video:
video.status = "video_processing_failed"
db.commit()
@router.post("/{video_id}", response_model=VideoProcessingStartResponse)
async def start_video_processing(
video_id: int,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Check if video exists and belongs to user
video = db.query(Video).filter(
Video.id == video_id,
Video.user_id == current_user.id
).first()
if not video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Video not found"
)
# Check if dubbed audio exists
dubbed_audio = db.query(DubbedAudio).filter(
DubbedAudio.video_id == video_id
).first()
if not dubbed_audio:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Dubbed audio not found. Please generate dubbed audio first."
)
# Check if dubbed video already exists
existing_dubbed_video = db.query(DubbedVideo).filter(
DubbedVideo.video_id == video_id
).first()
if existing_dubbed_video:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Dubbed video already exists for this video"
)
# Start background video processing
background_tasks.add_task(
background_process_video,
video_id,
video.original_s3_url,
dubbed_audio.s3_url,
db
)
return VideoProcessingStartResponse(
message="Video processing started in background",
video_id=video_id
)
@router.get("/{video_id}", response_model=DubbedVideoResponse)
async def get_processed_video(
video_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Check if video exists and belongs to user
video = db.query(Video).filter(
Video.id == video_id,
Video.user_id == current_user.id
).first()
if not video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Video not found"
)
# Get processed video
dubbed_video = db.query(DubbedVideo).filter(
DubbedVideo.video_id == video_id
).first()
if not dubbed_video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Processed video not found"
)
return DubbedVideoResponse(
id=dubbed_video.id,
video_id=dubbed_video.video_id,
s3_url=dubbed_video.s3_url,
created_at=str(dubbed_video.created_at)
)
@router.get("/results/{video_id}", response_model=VideoResultsResponse)
async def get_video_results(
video_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Check if video exists and belongs to user
video = db.query(Video).filter(
Video.id == video_id,
Video.user_id == current_user.id
).first()
if not video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Video not found"
)
# Get all related data
from app.models.transcription import Transcription
from app.models.translation import Translation
transcription = db.query(Transcription).filter(Transcription.video_id == video_id).first()
translation = db.query(Translation).filter(Translation.video_id == video_id).first()
dubbed_audio = db.query(DubbedAudio).filter(DubbedAudio.video_id == video_id).first()
dubbed_video = db.query(DubbedVideo).filter(DubbedVideo.video_id == video_id).first()
return VideoResultsResponse(
video_id=video.id,
original_video_url=video.original_s3_url,
transcript=transcription.text if transcription else "",
translated_text=translation.text if translation else "",
dubbed_audio_url=dubbed_audio.s3_url if dubbed_audio else "",
final_dubbed_video_url=dubbed_video.s3_url if dubbed_video else "",
processing_status=video.status
)