import uuid import aiofiles from pathlib import Path from typing import Optional, Tuple from fastapi import UploadFile from PIL import Image import magic from app.models.media import MediaType class MediaService: def __init__(self): self.storage_path = Path("/app/storage/media") self.thumbnails_path = Path("/app/storage/thumbnails") self.storage_path.mkdir(parents=True, exist_ok=True) self.thumbnails_path.mkdir(parents=True, exist_ok=True) # Max file sizes (in bytes) self.max_file_sizes = { MediaType.IMAGE: 10 * 1024 * 1024, # 10MB MediaType.VIDEO: 100 * 1024 * 1024, # 100MB MediaType.AUDIO: 50 * 1024 * 1024, # 50MB MediaType.DOCUMENT: 25 * 1024 * 1024 # 25MB } # Allowed MIME types self.allowed_types = { MediaType.IMAGE: [ "image/jpeg", "image/png", "image/gif", "image/webp", "image/bmp" ], MediaType.VIDEO: [ "video/mp4", "video/avi", "video/mov", "video/wmv", "video/flv", "video/webm" ], MediaType.AUDIO: [ "audio/mp3", "audio/wav", "audio/ogg", "audio/m4a", "audio/flac" ], MediaType.DOCUMENT: [ "application/pdf", "application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-powerpoint", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "text/plain", "text/csv", "application/zip", "application/x-rar-compressed" ] } async def save_file(self, file: UploadFile, uploader_id: int) -> Tuple[str, str, int, MediaType, Optional[int], Optional[int], Optional[str]]: """ Save uploaded file and return file info Returns: (filename, file_path, file_size, media_type, width, height, thumbnail_path) """ # Read file content content = await file.read() file_size = len(content) # Detect MIME type mime_type = magic.from_buffer(content, mime=True) # Determine media type media_type = self._get_media_type(mime_type) # Validate file type and size self._validate_file(media_type, mime_type, file_size) # Generate unique filename file_extension = self._get_file_extension(file.filename, mime_type) filename = f"{uuid.uuid4()}{file_extension}" file_path = self.storage_path / filename # Save file async with aiofiles.open(file_path, 'wb') as f: await f.write(content) # Process media (get dimensions, create thumbnail) width, height, thumbnail_path = await self._process_media( file_path, media_type, filename ) return str(filename), str(file_path), file_size, media_type, width, height, thumbnail_path def _get_media_type(self, mime_type: str) -> MediaType: """Determine media type from MIME type""" for media_type, allowed_mimes in self.allowed_types.items(): if mime_type in allowed_mimes: return media_type return MediaType.OTHER def _validate_file(self, media_type: MediaType, mime_type: str, file_size: int): """Validate file type and size""" # Check if MIME type is allowed if media_type in self.allowed_types: if mime_type not in self.allowed_types[media_type]: raise ValueError(f"File type {mime_type} not allowed") # Check file size max_size = self.max_file_sizes.get(media_type, 10 * 1024 * 1024) # Default 10MB if file_size > max_size: raise ValueError(f"File size {file_size} exceeds maximum {max_size}") def _get_file_extension(self, original_filename: str, mime_type: str) -> str: """Get appropriate file extension""" if original_filename and '.' in original_filename: return '.' + original_filename.rsplit('.', 1)[1].lower() # Fallback based on MIME type mime_extensions = { 'image/jpeg': '.jpg', 'image/png': '.png', 'image/gif': '.gif', 'video/mp4': '.mp4', 'audio/mp3': '.mp3', 'application/pdf': '.pdf', 'text/plain': '.txt' } return mime_extensions.get(mime_type, '.bin') async def _process_media(self, file_path: Path, media_type: MediaType, filename: str) -> Tuple[Optional[int], Optional[int], Optional[str]]: """Process media file (get dimensions, create thumbnails)""" width, height, thumbnail_path = None, None, None if media_type == MediaType.IMAGE: try: with Image.open(file_path) as img: width, height = img.size # Create thumbnail thumbnail_filename = f"thumb_{filename}" thumbnail_path = self.thumbnails_path / thumbnail_filename # Create thumbnail (max 300x300) img.thumbnail((300, 300), Image.Resampling.LANCZOS) img.save(thumbnail_path, format='JPEG', quality=85) thumbnail_path = str(thumbnail_path) except Exception as e: print(f"Error processing image {filename}: {e}") elif media_type == MediaType.VIDEO: # For videos, you'd typically use ffmpeg to get dimensions and create thumbnails # This is a simplified version try: # You would use ffprobe to get video dimensions # For now, we'll set default values width, height = 1920, 1080 # Default values # Create video thumbnail using ffmpeg # thumbnail_filename = f"thumb_{filename}.jpg" # thumbnail_path = self.thumbnails_path / thumbnail_filename # ... ffmpeg command to extract frame ... except Exception as e: print(f"Error processing video {filename}: {e}") return width, height, thumbnail_path async def get_file_content(self, filename: str) -> bytes: """Get file content""" file_path = self.storage_path / filename if not file_path.exists(): raise FileNotFoundError(f"File {filename} not found") async with aiofiles.open(file_path, 'rb') as f: return await f.read() async def get_thumbnail_content(self, filename: str) -> bytes: """Get thumbnail content""" thumbnail_path = self.thumbnails_path / f"thumb_{filename}" if not thumbnail_path.exists(): raise FileNotFoundError(f"Thumbnail for {filename} not found") async with aiofiles.open(thumbnail_path, 'rb') as f: return await f.read() def delete_file(self, filename: str, thumbnail_filename: Optional[str] = None): """Delete file and its thumbnail""" file_path = self.storage_path / filename if file_path.exists(): file_path.unlink() if thumbnail_filename: thumbnail_path = self.thumbnails_path / thumbnail_filename if thumbnail_path.exists(): thumbnail_path.unlink() # Global media service instance media_service = MediaService()