2025-06-05 14:29:01 +00:00

100 lines
2.9 KiB
Python

import os
import shutil
import uuid
from typing import Optional
from fastapi import HTTPException, UploadFile
from app.core.config import settings
def validate_file_size(file: UploadFile, max_size: int = settings.MAX_UPLOAD_SIZE) -> None:
"""Validate that the file is not larger than the maximum allowed size."""
# Move to start of file to ensure we read from the beginning
file.file.seek(0, os.SEEK_END)
file_size = file.file.tell()
# Reset file pointer to the beginning
file.file.seek(0)
if file_size > max_size:
raise HTTPException(
status_code=413,
detail=f"File size ({file_size} bytes) exceeds maximum allowed size ({max_size} bytes)"
)
def validate_audio_file(file: UploadFile) -> None:
"""Validate that the file is an audio file based on content type."""
allowed_content_types = [
"audio/mpeg", # MP3
"audio/wav", # WAV
"audio/ogg", # OGG
"audio/flac", # FLAC
"audio/aac", # AAC
]
if file.content_type not in allowed_content_types:
raise HTTPException(
status_code=415,
detail=f"Unsupported file type: {file.content_type}. Allowed types: {', '.join(allowed_content_types)}"
)
validate_file_size(file)
def validate_image_file(file: UploadFile) -> None:
"""Validate that the file is an image file based on content type."""
allowed_content_types = [
"image/jpeg", # JPEG, JPG
"image/png", # PNG
"image/gif", # GIF
"image/webp", # WEBP
]
if file.content_type not in allowed_content_types:
raise HTTPException(
status_code=415,
detail=f"Unsupported file type: {file.content_type}. Allowed types: {', '.join(allowed_content_types)}"
)
validate_file_size(file)
def save_upload_file(
upload_file: UploadFile,
destination_folder: str,
file_id: Optional[str] = None,
) -> str:
"""
Save the uploaded file to the specified destination folder.
Returns the relative path to the saved file.
"""
# Ensure destination directory exists
os.makedirs(destination_folder, exist_ok=True)
# Extract file extension
_, ext = os.path.splitext(upload_file.filename)
# Generate unique filename
filename = f"{file_id or str(uuid.uuid4())}{ext}"
file_path = os.path.join(destination_folder, filename)
# Save the file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
return filename
def remove_file(file_path: str) -> bool:
"""
Remove a file from the filesystem.
Returns True if the file was successfully removed, False otherwise.
"""
try:
if os.path.exists(file_path):
os.remove(file_path)
return True
return False
except Exception:
return False