
- Set up FastAPI project structure - Implement database models and migrations for file metadata - Create file upload endpoint with size validation - Implement file download and listing functionality - Add health check and API information endpoints - Create comprehensive documentation
119 lines
3.3 KiB
Python
119 lines
3.3 KiB
Python
import os
|
|
import shutil
|
|
from fastapi import UploadFile, HTTPException, status
|
|
from pathlib import Path
|
|
from typing import BinaryIO
|
|
|
|
from app.core.config import settings
|
|
from app.models.file import File
|
|
|
|
|
|
class FileService:
|
|
"""Service for handling file operations."""
|
|
|
|
@staticmethod
|
|
async def save_file(upload_file: UploadFile) -> tuple[str, str, int]:
|
|
"""
|
|
Save an uploaded file to disk.
|
|
|
|
Args:
|
|
upload_file: The uploaded file
|
|
|
|
Returns:
|
|
Tuple of (unique_filename, file_path, file_size)
|
|
|
|
Raises:
|
|
HTTPException: If file size exceeds the maximum allowed size
|
|
"""
|
|
# Generate a unique filename to prevent collisions
|
|
original_filename = upload_file.filename or "unnamed_file"
|
|
unique_filename = File.generate_unique_filename(original_filename)
|
|
|
|
# Define the path where the file will be saved
|
|
file_path = settings.FILE_STORAGE_DIR / unique_filename
|
|
|
|
# Check file size
|
|
# First, we need to get the file size
|
|
upload_file.file.seek(0, os.SEEK_END)
|
|
file_size = upload_file.file.tell()
|
|
upload_file.file.seek(0) # Reset file position
|
|
|
|
if file_size > settings.MAX_FILE_SIZE:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
|
|
detail=f"File size exceeds the maximum allowed size of {settings.MAX_FILE_SIZE} bytes",
|
|
)
|
|
|
|
# Save the file
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(upload_file.file, buffer)
|
|
|
|
return unique_filename, str(file_path), file_size
|
|
|
|
@staticmethod
|
|
def get_file_path(filename: str) -> Path:
|
|
"""
|
|
Get the full path for a file.
|
|
|
|
Args:
|
|
filename: The filename to retrieve
|
|
|
|
Returns:
|
|
Path object pointing to the file
|
|
"""
|
|
return settings.FILE_STORAGE_DIR / filename
|
|
|
|
@staticmethod
|
|
def file_exists(filename: str) -> bool:
|
|
"""
|
|
Check if a file exists on disk.
|
|
|
|
Args:
|
|
filename: The filename to check
|
|
|
|
Returns:
|
|
True if the file exists, False otherwise
|
|
"""
|
|
file_path = FileService.get_file_path(filename)
|
|
return file_path.exists() and file_path.is_file()
|
|
|
|
@staticmethod
|
|
def delete_file(filename: str) -> bool:
|
|
"""
|
|
Delete a file from disk.
|
|
|
|
Args:
|
|
filename: The filename to delete
|
|
|
|
Returns:
|
|
True if the file was deleted, False otherwise
|
|
"""
|
|
file_path = FileService.get_file_path(filename)
|
|
if file_path.exists() and file_path.is_file():
|
|
file_path.unlink()
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_file_content(filename: str) -> BinaryIO:
|
|
"""
|
|
Get file content as a binary file object.
|
|
|
|
Args:
|
|
filename: The filename to retrieve
|
|
|
|
Returns:
|
|
File object opened in binary mode
|
|
|
|
Raises:
|
|
HTTPException: If the file does not exist
|
|
"""
|
|
file_path = FileService.get_file_path(filename)
|
|
if not file_path.exists() or not file_path.is_file():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="File not found",
|
|
)
|
|
|
|
return open(file_path, "rb")
|