72 lines
2.3 KiB
Python
72 lines
2.3 KiB
Python
import uuid
|
|
from pathlib import Path
|
|
from fastapi import UploadFile, HTTPException
|
|
from PIL import Image
|
|
import aiofiles
|
|
|
|
UPLOAD_DIR = Path("/app/storage/uploads")
|
|
ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
|
|
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
|
|
|
|
|
|
async def save_upload_file(upload_file: UploadFile, subfolder: str = "general") -> str:
|
|
"""Save uploaded file and return the file path"""
|
|
|
|
if upload_file.size > MAX_FILE_SIZE:
|
|
raise HTTPException(status_code=413, detail="File too large")
|
|
|
|
if upload_file.content_type not in ALLOWED_IMAGE_TYPES:
|
|
raise HTTPException(status_code=400, detail="Invalid file type")
|
|
|
|
# Create directory if it doesn't exist
|
|
upload_path = UPLOAD_DIR / subfolder
|
|
upload_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate unique filename
|
|
file_extension = upload_file.filename.split(".")[-1]
|
|
unique_filename = f"{uuid.uuid4()}.{file_extension}"
|
|
file_path = upload_path / unique_filename
|
|
|
|
# Save file
|
|
async with aiofiles.open(file_path, "wb") as f:
|
|
content = await upload_file.read()
|
|
await f.write(content)
|
|
|
|
# Optimize image if it's an image
|
|
if upload_file.content_type.startswith("image/"):
|
|
optimize_image(str(file_path))
|
|
|
|
return f"/uploads/{subfolder}/{unique_filename}"
|
|
|
|
|
|
def optimize_image(
|
|
file_path: str, max_width: int = 1024, max_height: int = 1024, quality: int = 85
|
|
):
|
|
"""Optimize image size and quality"""
|
|
try:
|
|
with Image.open(file_path) as img:
|
|
# Convert to RGB if needed
|
|
if img.mode in ("RGBA", "P"):
|
|
img = img.convert("RGB")
|
|
|
|
# Resize if too large
|
|
if img.width > max_width or img.height > max_height:
|
|
img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
|
|
|
|
# Save with optimization
|
|
img.save(file_path, optimize=True, quality=quality)
|
|
except Exception as e:
|
|
print(f"Image optimization failed: {e}")
|
|
|
|
|
|
def delete_file(file_path: str) -> bool:
|
|
"""Delete a file from storage"""
|
|
try:
|
|
full_path = Path("/app/storage") / file_path.lstrip("/")
|
|
if full_path.exists():
|
|
full_path.unlink()
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|