Implement tomato severity segmentation model API

- Set up FastAPI project structure with SQLite database
- Create database models for tomato images and severity classifications
- Implement image upload and processing endpoints
- Develop a segmentation model for tomato disease severity detection
- Add API endpoints for analysis and results retrieval
- Implement health check endpoint
- Set up Alembic for database migrations
- Update project documentation
This commit is contained in:
Automated Action 2025-05-27 06:22:15 +00:00
parent 4e0b450a19
commit ab0e9b973a
27 changed files with 1380 additions and 2 deletions

143
README.md
View File

@ -1,3 +1,142 @@
# FastAPI Application
# Tomato Severity Segmentation Model API
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A FastAPI-based application for detecting and analyzing disease severity in tomato plants using image segmentation techniques.
## Overview
This API allows users to upload tomato plant images and analyze them for various diseases and their severity. The system uses a segmentation model to identify affected areas and classify the severity of diseases such as early blight, late blight, bacterial spot, and septoria leaf spot.
## Features
- **Image Upload**: Upload tomato plant images for analysis
- **Disease Detection**: Identify multiple disease types in a single image
- **Severity Classification**: Classify the severity of detected diseases
- **Segmentation Maps**: Generate segmentation maps highlighting affected areas
- **Health Monitoring**: Built-in health endpoint for monitoring application status
## Installation
### Prerequisites
- Python 3.8+
- pip (Python package manager)
### Setup
1. Clone the repository:
```bash
git clone <repository-url>
cd tomatoseveritysegmentationmodel
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run database migrations:
```bash
alembic upgrade head
```
4. Start the application:
```bash
uvicorn main:app --reload
```
The API will be available at `http://localhost:8000`
## API Endpoints
### Health Check
- `GET /health`: Check the health of the application
### Image Management
- `POST /api/tomatoes/upload`: Upload a tomato image
- `GET /api/tomatoes`: List all uploaded tomato images
- `GET /api/tomatoes/{image_id}`: Get details of a specific image
- `DELETE /api/tomatoes/{image_id}`: Delete an image and its analysis data
### Analysis
- `POST /api/model/analyze/{image_id}`: Analyze a tomato image for disease severity
- `GET /api/model/results/{image_id}`: Get all analysis results for a specific image
- `GET /api/model/info`: Get information about the segmentation model
## Database Schema
The application uses SQLite with the following main tables:
- `tomato_images`: Stores uploaded image metadata
- `analysis_results`: Stores analysis results for each image
- `severity_details`: Stores detailed severity data for each analysis
## Segmentation Model
The current implementation uses conventional computer vision techniques for segmentation as a proof of concept. In a production environment, this would be replaced with a trained deep learning model like U-Net or DeepLabV3.
The model classifies the following disease categories:
- Healthy
- Early Blight
- Late Blight
- Bacterial Spot
- Septoria Leaf Spot
## Example Usage
### Upload an image
```bash
curl -X POST "http://localhost:8000/api/tomatoes/upload" \
-H "accept: application/json" \
-H "Content-Type: multipart/form-data" \
-F "file=@tomato_plant.jpg"
```
### Analyze the image
```bash
curl -X POST "http://localhost:8000/api/model/analyze/{image_id}" \
-H "accept: application/json"
```
## Development
### Project Structure
```
tomatoseveritysegmentationmodel/
├── app/
│ ├── api/
│ │ └── routes/
│ ├── core/
│ ├── db/
│ ├── models/
│ ├── schemas/
│ ├── services/
│ └── utils/
├── migrations/
├── storage/
│ ├── db/
│ ├── images/
│ └── models/
├── alembic.ini
├── main.py
└── requirements.txt
```
### Adding New Features
To add new features:
1. Define models in `app/models/`
2. Create Pydantic schemas in `app/schemas/`
3. Add database migrations with Alembic
4. Implement routes in `app/api/routes/`
5. Update tests
## License
This project is licensed under the MIT License - see the LICENSE file for details.

84
alembic.ini Normal file
View File

@ -0,0 +1,84 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

View File

81
app/api/routes/health.py Normal file
View File

@ -0,0 +1,81 @@
from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from typing import Dict
import time
from pathlib import Path
from app.core.config import settings
router = APIRouter()
@router.get("/health", status_code=status.HTTP_200_OK)
def health_check(db: Session = Depends(get_db)) -> Dict[str, any]:
"""
Perform a health check of the service.
Checks:
- Database connection
- Storage directories
- Any critical services the application depends on
"""
health_data = {
"status": "ok",
"timestamp": time.time(),
"checks": {
"database": check_database(db),
"storage": check_storage(),
}
}
# If any check failed, update the overall status
if any(not check["status"] for check in health_data["checks"].values()):
health_data["status"] = "degraded"
return health_data
def check_database(db: Session) -> Dict[str, any]:
"""Check if the database is accessible."""
try:
# Simple query to check if the database is responding
db.execute("SELECT 1")
return {
"status": True,
"message": "Database connection successful"
}
except Exception as e:
return {
"status": False,
"message": f"Database connection failed: {str(e)}"
}
def check_storage() -> Dict[str, any]:
"""Check if storage directories are accessible and writable."""
storage_status = True
messages = []
# Check main storage directories
for directory in [settings.DB_DIR, settings.IMAGE_DIR, settings.MODEL_DIR]:
dir_path = Path(directory)
if not dir_path.exists():
try:
dir_path.mkdir(parents=True, exist_ok=True)
messages.append(f"Created missing directory: {dir_path}")
except Exception as e:
storage_status = False
messages.append(f"Failed to create directory {dir_path}: {str(e)}")
continue
# Check if directory is writable by trying to create a temp file
try:
temp_file = dir_path / ".health_check_temp"
temp_file.touch()
temp_file.unlink()
except Exception as e:
storage_status = False
messages.append(f"Directory {dir_path} is not writable: {str(e)}")
return {
"status": storage_status,
"message": "; ".join(messages) if messages else "All storage directories are accessible and writable"
}

166
app/api/routes/model.py Normal file
View File

@ -0,0 +1,166 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List, Dict, Any
import json
from pathlib import Path
from app.db.session import get_db
from app.db.crud_tomato import tomato_image, analysis_result
from app.schemas.tomato import (
AnalysisResponse,
AnalysisResultCreate,
SeverityDetailCreate
)
from app.services.model import segmentation_model
router = APIRouter()
@router.post("/analyze/{image_id}", response_model=AnalysisResponse)
def analyze_tomato_image(
image_id: str,
db: Session = Depends(get_db)
):
"""
Analyze a tomato image to detect disease severity.
This endpoint processes the image using the segmentation model
and returns detailed analysis results, including disease severity
classification and segmentation masks.
"""
# Get the image from database
db_image = tomato_image.get(db=db, id=image_id)
if db_image is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Image not found"
)
# Check if image file exists
image_path = db_image.file_path
if not Path(image_path).exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Image file not found on server"
)
# Analyze the image
analysis_results = segmentation_model.analyze_image(image_path)
if not analysis_results.get("success", False):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=analysis_results.get("error", "Failed to analyze image")
)
# Store analysis results in database
analysis_data = {
"image_id": db_image.id,
"model_name": analysis_results["model_name"],
"model_version": analysis_results["model_version"],
"primary_severity": analysis_results["primary_severity"],
"severity_confidence": analysis_results["severity_confidence"],
"segmentation_data": json.dumps(analysis_results["segmentation_data"]),
"processing_time_ms": analysis_results["processing_time_ms"]
}
analysis_in = AnalysisResultCreate(**analysis_data)
# Create severity details
severity_details_data = []
for detail in analysis_results["severity_details"]:
severity_details_data.append(
SeverityDetailCreate(
severity_class=detail["severity_class"],
confidence=detail["confidence"],
affected_area_percentage=detail["affected_area_percentage"],
analysis_id="" # Will be set after analysis is created
)
)
# Create analysis result with details
db_analysis = analysis_result.create_with_details(
db=db,
analysis_in=analysis_in,
details_in=severity_details_data
)
# Get fresh data with relationships loaded
db_analysis = analysis_result.get(db=db, id=db_analysis.id)
# Prepare response
response = AnalysisResponse(
id=db_analysis.id,
image=db_image,
primary_severity=db_analysis.primary_severity,
severity_confidence=db_analysis.severity_confidence,
severity_details=db_analysis.severity_details,
segmentation_data=db_analysis.segmentation_data,
processed_at=db_analysis.processed_at,
model_name=db_analysis.model_name,
model_version=db_analysis.model_version
)
return response
@router.get("/results/{image_id}", response_model=List[AnalysisResponse])
def get_analysis_results(
image_id: str,
db: Session = Depends(get_db)
):
"""
Get all analysis results for a specific tomato image.
"""
# Check if image exists
db_image = tomato_image.get(db=db, id=image_id)
if db_image is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Image not found"
)
# Get analysis results
analyses = analysis_result.get_for_image(db=db, image_id=image_id)
# Prepare response
responses = []
for db_analysis in analyses:
responses.append(
AnalysisResponse(
id=db_analysis.id,
image=db_image,
primary_severity=db_analysis.primary_severity,
severity_confidence=db_analysis.severity_confidence,
severity_details=db_analysis.severity_details,
segmentation_data=db_analysis.segmentation_data,
processed_at=db_analysis.processed_at,
model_name=db_analysis.model_name,
model_version=db_analysis.model_version
)
)
return responses
@router.get("/info", response_model=Dict[str, Any])
def get_model_info():
"""
Get information about the tomato severity segmentation model.
"""
return {
"name": segmentation_model.model_name,
"version": segmentation_model.model_version,
"description": "Tomato disease severity segmentation model",
"input_format": {
"type": "image",
"size": segmentation_model.input_size,
"supported_formats": ["JPEG", "PNG"]
},
"severity_classes": segmentation_model.severity_classes,
"capabilities": [
"disease classification",
"severity assessment",
"leaf segmentation"
]
}

112
app/api/routes/tomato.py Normal file
View File

@ -0,0 +1,112 @@
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, status
from sqlalchemy.orm import Session
from typing import List
from pathlib import Path
from app.db.session import get_db
from app.db.crud_tomato import tomato_image
from app.schemas.tomato import TomatoImage, TomatoImageCreate, UploadResponse
from app.utils.image import save_uploaded_image
from app.core.config import settings
router = APIRouter()
@router.post("/upload", response_model=UploadResponse, status_code=status.HTTP_201_CREATED)
async def upload_tomato_image(
file: UploadFile = File(...),
db: Session = Depends(get_db)
):
"""
Upload a tomato image for analysis.
The image will be saved and registered in the database.
It can then be analyzed using the analysis endpoint.
"""
# Validate file type
if file.content_type not in settings.ALLOWED_IMAGE_TYPES:
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail=f"Unsupported file type: {file.content_type}. Allowed types: {', '.join(settings.ALLOWED_IMAGE_TYPES)}"
)
# Read file content
contents = await file.read()
# Validate file size
if len(contents) > settings.MAX_IMAGE_SIZE:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File too large. Maximum size allowed: {settings.MAX_IMAGE_SIZE / (1024 * 1024)}MB"
)
# Save file and get metadata
image_data = save_uploaded_image(contents, file.filename)
# Create database record
image_in = TomatoImageCreate(**image_data)
db_image = tomato_image.create(db=db, obj_in=image_in)
return UploadResponse(
image=db_image,
message="Image uploaded successfully"
)
@router.get("/", response_model=List[TomatoImage])
def list_tomato_images(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""
List all uploaded tomato images.
"""
images = tomato_image.get_multi(db=db, skip=skip, limit=limit)
return images
@router.get("/{image_id}", response_model=TomatoImage)
def get_tomato_image(
image_id: str,
db: Session = Depends(get_db)
):
"""
Get a specific tomato image by ID.
"""
db_image = tomato_image.get(db=db, id=image_id)
if db_image is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Image not found"
)
return db_image
@router.delete("/{image_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_tomato_image(
image_id: str,
db: Session = Depends(get_db)
):
"""
Delete a tomato image and its associated data.
"""
db_image = tomato_image.get(db=db, id=image_id)
if db_image is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Image not found"
)
# Delete the image file
try:
if db_image.file_path and Path(db_image.file_path).exists():
Path(db_image.file_path).unlink()
except Exception:
# Log error but continue with database deletion
pass
# Delete database record (cascade will handle related records)
tomato_image.remove(db=db, id=image_id)
return None

0
app/core/__init__.py Normal file
View File

47
app/core/config.py Normal file
View File

@ -0,0 +1,47 @@
from pathlib import Path
from typing import List
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Base settings
PROJECT_NAME: str = "Tomato Severity Segmentation API"
API_V1_STR: str = "/api"
DEBUG: bool = True
# CORS
CORS_ORIGINS: List[str] = ["*"]
# Paths
BASE_DIR: Path = Path(__file__).resolve().parent.parent.parent
STORAGE_DIR: Path = BASE_DIR / "storage"
# Database
DB_DIR: Path = STORAGE_DIR / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite"
# Image storage
IMAGE_DIR: Path = STORAGE_DIR / "images"
IMAGE_DIR.mkdir(parents=True, exist_ok=True)
MAX_IMAGE_SIZE: int = 10 * 1024 * 1024 # 10MB
ALLOWED_IMAGE_TYPES: List[str] = ["image/jpeg", "image/png"]
# Model settings
MODEL_DIR: Path = STORAGE_DIR / "models"
MODEL_DIR.mkdir(parents=True, exist_ok=True)
DEFAULT_MODEL_NAME: str = "tomato_severity_model"
# Severity classifications
SEVERITY_CLASSES: List[str] = [
"healthy",
"early_blight",
"late_blight",
"bacterial_spot",
"septoria_leaf_spot"
]
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()

0
app/db/__init__.py Normal file
View File

54
app/db/crud.py Normal file
View File

@ -0,0 +1,54 @@
from typing import List, Optional, Generic, TypeVar, Type
from sqlalchemy.orm import Session
from pydantic import BaseModel
from fastapi.encoders import jsonable_encoder
from app.db.session import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
"""
self.model = model
def get(self, db: Session, id: str) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: ModelType, obj_in: UpdateSchemaType
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: str) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

50
app/db/crud_tomato.py Normal file
View File

@ -0,0 +1,50 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from app.db.crud import CRUDBase
from app.models.tomato import TomatoImage, AnalysisResult, SeverityDetail
from app.schemas.tomato import TomatoImageCreate, AnalysisResultCreate, SeverityDetailCreate
class CRUDTomatoImage(CRUDBase[TomatoImage, TomatoImageCreate, TomatoImageCreate]):
def get_by_path(self, db: Session, *, file_path: str) -> Optional[TomatoImage]:
return db.query(TomatoImage).filter(TomatoImage.file_path == file_path).first()
class CRUDAnalysisResult(CRUDBase[AnalysisResult, AnalysisResultCreate, AnalysisResultCreate]):
def get_for_image(self, db: Session, *, image_id: str) -> List[AnalysisResult]:
return db.query(AnalysisResult).filter(AnalysisResult.image_id == image_id).all()
def create_with_details(
self,
db: Session,
*,
analysis_in: AnalysisResultCreate,
details_in: List[SeverityDetailCreate]
) -> AnalysisResult:
# Create the analysis result
analysis_obj = self.create(db=db, obj_in=analysis_in)
# Create severity details linked to this analysis
for detail in details_in:
severity_obj = SeverityDetail(
analysis_id=analysis_obj.id,
severity_class=detail.severity_class,
confidence=detail.confidence,
affected_area_percentage=detail.affected_area_percentage
)
db.add(severity_obj)
db.commit()
db.refresh(analysis_obj)
return analysis_obj
class CRUDSeverityDetail(CRUDBase[SeverityDetail, SeverityDetailCreate, SeverityDetailCreate]):
def get_for_analysis(self, db: Session, *, analysis_id: str) -> List[SeverityDetail]:
return db.query(SeverityDetail).filter(SeverityDetail.analysis_id == analysis_id).all()
tomato_image = CRUDTomatoImage(TomatoImage)
analysis_result = CRUDAnalysisResult(AnalysisResult)
severity_detail = CRUDSeverityDetail(SeverityDetail)

22
app/db/session.py Normal file
View File

@ -0,0 +1,22 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency to get DB session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

0
app/models/__init__.py Normal file
View File

69
app/models/tomato.py Normal file
View File

@ -0,0 +1,69 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, String, DateTime, Float, Text, ForeignKey, Integer
from sqlalchemy.orm import relationship
from app.db.session import Base
class TomatoImage(Base):
__tablename__ = "tomato_images"
id = Column(String(36), primary_key=True, index=True, default=lambda: str(uuid.uuid4()))
filename = Column(String(255), nullable=False)
file_path = Column(String(512), nullable=False, unique=True)
file_size = Column(Integer, nullable=False)
mime_type = Column(String(50), nullable=False)
width = Column(Integer, nullable=True)
height = Column(Integer, nullable=True)
uploaded_at = Column(DateTime, default=datetime.utcnow, nullable=False)
# Relationships
analysis_results = relationship("AnalysisResult", back_populates="image", cascade="all, delete-orphan")
def __repr__(self):
return f"<TomatoImage {self.filename}>"
class AnalysisResult(Base):
__tablename__ = "analysis_results"
id = Column(String(36), primary_key=True, index=True, default=lambda: str(uuid.uuid4()))
image_id = Column(String(36), ForeignKey("tomato_images.id", ondelete="CASCADE"), nullable=False)
model_name = Column(String(255), nullable=False)
model_version = Column(String(50), nullable=False)
# Overall severity data
primary_severity = Column(String(50), nullable=True)
severity_confidence = Column(Float, nullable=True)
# Segmentation data (stored as JSON string)
segmentation_data = Column(Text, nullable=True)
# Additional metadata
processed_at = Column(DateTime, default=datetime.utcnow, nullable=False)
processing_time_ms = Column(Integer, nullable=True)
# Relationships
image = relationship("TomatoImage", back_populates="analysis_results")
severity_details = relationship("SeverityDetail", back_populates="analysis_result", cascade="all, delete-orphan")
def __repr__(self):
return f"<AnalysisResult {self.id} for image {self.image_id}>"
class SeverityDetail(Base):
__tablename__ = "severity_details"
id = Column(String(36), primary_key=True, index=True, default=lambda: str(uuid.uuid4()))
analysis_id = Column(String(36), ForeignKey("analysis_results.id", ondelete="CASCADE"), nullable=False)
severity_class = Column(String(50), nullable=False)
confidence = Column(Float, nullable=False)
affected_area_percentage = Column(Float, nullable=True)
# Relationships
analysis_result = relationship("AnalysisResult", back_populates="severity_details")
def __repr__(self):
return f"<SeverityDetail {self.severity_class} ({self.confidence:.2f})>"

0
app/schemas/__init__.py Normal file
View File

96
app/schemas/tomato.py Normal file
View File

@ -0,0 +1,96 @@
from datetime import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, validator
# TomatoImage schemas
class TomatoImageBase(BaseModel):
filename: str
mime_type: str
width: Optional[int] = None
height: Optional[int] = None
file_size: int
class TomatoImageCreate(TomatoImageBase):
file_path: str
class TomatoImage(TomatoImageBase):
id: str
file_path: str
uploaded_at: datetime
class Config:
from_attributes = True
# SeverityDetail schemas
class SeverityDetailBase(BaseModel):
severity_class: str
confidence: float
affected_area_percentage: Optional[float] = None
class SeverityDetailCreate(SeverityDetailBase):
analysis_id: str
class SeverityDetail(SeverityDetailBase):
id: str
analysis_id: str
class Config:
from_attributes = True
# AnalysisResult schemas
class AnalysisResultBase(BaseModel):
model_name: str
model_version: str
primary_severity: Optional[str] = None
severity_confidence: Optional[float] = None
segmentation_data: Optional[str] = None
processing_time_ms: Optional[int] = None
class AnalysisResultCreate(AnalysisResultBase):
image_id: str
class AnalysisResult(AnalysisResultBase):
id: str
image_id: str
processed_at: datetime
severity_details: List[SeverityDetail] = []
class Config:
from_attributes = True
# Response schemas
class AnalysisResponse(BaseModel):
id: str
image: TomatoImage
primary_severity: Optional[str] = None
severity_confidence: Optional[float] = None
severity_details: List[SeverityDetail] = []
segmentation_data: Optional[Dict[str, Any]] = None
processed_at: datetime
model_name: str
model_version: str
@validator('segmentation_data', pre=True)
def parse_segmentation_data(cls, v):
import json
if v and isinstance(v, str):
try:
return json.loads(v)
except Exception:
return None
return v
class UploadResponse(BaseModel):
image: TomatoImage
message: str = "Image uploaded successfully"

0
app/services/__init__.py Normal file
View File

148
app/services/model.py Normal file
View File

@ -0,0 +1,148 @@
import numpy as np
import cv2
import time
from typing import Dict, Optional, Any
from app.core.config import settings
from app.utils.image import load_image, resize_image, normalize_image
class TomatoSegmentationModel:
"""
Model for tomato disease severity segmentation.
This class implements a simple image segmentation model using
conventional computer vision techniques as a placeholder.
In a production scenario, this would be replaced with a proper
deep learning model like U-Net, DeepLabV3, etc.
"""
def __init__(self):
self.model_name = settings.DEFAULT_MODEL_NAME
self.model_version = "0.1.0"
self.severity_classes = settings.SEVERITY_CLASSES
self.input_size = (224, 224) # Standard input size
def preprocess_image(self, image_path: str) -> Optional[np.ndarray]:
"""Preprocess an image for analysis."""
# Load image
image = load_image(image_path)
if image is None:
return None
# Resize
image = resize_image(image, self.input_size)
# Normalize
image = normalize_image(image)
return image
def analyze_image(self, image_path: str) -> Dict[str, Any]:
"""
Analyze a tomato image to determine disease severity.
Args:
image_path: Path to the image file
Returns:
Dictionary with analysis results including segmentation masks and severity scores
"""
start_time = time.time()
# Preprocess image
image = self.preprocess_image(image_path)
if image is None:
return {
"error": "Failed to load or process image",
"success": False
}
# Simple color-based segmentation to identify potential disease areas
# This is a simplified placeholder implementation
# Convert back to BGR for OpenCV
image_bgr = (image * 255).astype(np.uint8)
image_bgr = cv2.cvtColor(image_bgr, cv2.COLOR_RGB2BGR)
# Convert to HSV for better color segmentation
image_hsv = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2HSV)
# Define color ranges for different severity classes
# These are placeholder values and would need to be calibrated for real use
color_ranges = {
"healthy": [(35, 50, 50), (85, 255, 255)], # Green healthy leaves
"early_blight": [(15, 50, 50), (35, 255, 255)], # Yellowish
"late_blight": [(0, 50, 50), (15, 255, 255)], # Reddish-brown
"bacterial_spot": [(0, 0, 0), (180, 50, 100)], # Dark spots
"septoria_leaf_spot": [(0, 0, 100), (180, 50, 255)] # Light spots
}
# Create segmentation masks and calculate affected areas
masks = {}
severity_details = []
total_pixels = image.shape[0] * image.shape[1]
for severity_class, (lower, upper) in color_ranges.items():
# Create mask for this color range
lower = np.array(lower, dtype=np.uint8)
upper = np.array(upper, dtype=np.uint8)
mask = cv2.inRange(image_hsv, lower, upper)
# Apply morphological operations to clean up the mask
kernel = np.ones((5, 5), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
# Calculate affected area percentage
affected_pixels = np.count_nonzero(mask)
affected_percentage = (affected_pixels / total_pixels) * 100
# Generate a random confidence score for demo purposes
# In a real model, this would be the model's actual confidence
confidence = min(affected_percentage / 50, 1.0)
if severity_class == "healthy" and affected_percentage < 10:
confidence = 0.9 # Boost confidence for low healthy area (likely diseased)
# Store results
masks[severity_class] = mask.tolist() # Convert to list for JSON serialization
severity_details.append({
"severity_class": severity_class,
"confidence": float(confidence),
"affected_area_percentage": float(affected_percentage)
})
# Determine primary severity class
severity_details.sort(key=lambda x: x["confidence"], reverse=True)
primary_severity = severity_details[0]["severity_class"]
severity_confidence = severity_details[0]["confidence"]
# If healthy has high confidence, it takes precedence
for detail in severity_details:
if detail["severity_class"] == "healthy" and detail["confidence"] > 0.7:
primary_severity = "healthy"
severity_confidence = detail["confidence"]
break
# Prepare results
processing_time = int((time.time() - start_time) * 1000) # ms
result = {
"success": True,
"model_name": self.model_name,
"model_version": self.model_version,
"primary_severity": primary_severity,
"severity_confidence": severity_confidence,
"severity_details": severity_details,
"segmentation_data": {
"image_size": self.input_size,
"masks": masks
},
"processing_time_ms": processing_time
}
return result
# Singleton instance
segmentation_model = TomatoSegmentationModel()

0
app/utils/__init__.py Normal file
View File

88
app/utils/image.py Normal file
View File

@ -0,0 +1,88 @@
import cv2
import numpy as np
from pathlib import Path
import uuid
import os
from datetime import datetime
from typing import Tuple, Dict, Any, Optional
from app.core.config import settings
def save_uploaded_image(file_data: bytes, filename: str) -> Dict[str, Any]:
"""
Save an uploaded image to the storage directory and return metadata.
Args:
file_data: The binary content of the uploaded file
filename: Original filename of the uploaded file
Returns:
Dict with image metadata including path, size, etc.
"""
# Generate a unique filename to avoid conflicts
extension = Path(filename).suffix.lower()
date_prefix = datetime.now().strftime("%Y%m%d")
unique_id = str(uuid.uuid4())
safe_filename = f"{date_prefix}_{unique_id}{extension}"
# Create full path
file_path = settings.IMAGE_DIR / safe_filename
# Write file to disk
with open(file_path, "wb") as f:
f.write(file_data)
# Get image dimensions if it's an image file
width, height = None, None
try:
img = cv2.imread(str(file_path))
if img is not None:
height, width, _ = img.shape
except Exception:
pass
return {
"file_path": str(file_path),
"filename": filename,
"file_size": len(file_data),
"width": width,
"height": height,
"mime_type": get_mime_type(extension)
}
def get_mime_type(extension: str) -> str:
"""Map file extension to MIME type."""
mime_types = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".bmp": "image/bmp",
".tiff": "image/tiff",
".tif": "image/tiff",
}
return mime_types.get(extension.lower(), "application/octet-stream")
def load_image(file_path: str) -> Optional[np.ndarray]:
"""Load an image from file path."""
if not os.path.exists(file_path):
return None
img = cv2.imread(file_path)
if img is None:
return None
return cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Convert to RGB
def resize_image(image: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray:
"""Resize image to target size."""
return cv2.resize(image, target_size, interpolation=cv2.INTER_AREA)
def normalize_image(image: np.ndarray) -> np.ndarray:
"""Normalize image pixel values to [0, 1]."""
return image.astype(np.float32) / 255.0

35
main.py Normal file
View File

@ -0,0 +1,35 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import health, tomato, model
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description="Tomato Severity Segmentation Model API",
version="0.1.0",
docs_url="/docs",
redoc_url="/redoc",
)
# Set up CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API routes
app.include_router(health.router, tags=["health"])
app.include_router(tomato.router, prefix="/api/tomatoes", tags=["tomatoes"])
app.include_router(model.router, prefix="/api/model", tags=["model"])
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=settings.DEBUG,
)

79
migrations/env.py Normal file
View File

@ -0,0 +1,79 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.models.tomato import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
is_sqlite = connection.dialect.name == 'sqlite'
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=is_sqlite # Key configuration for SQLite
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,58 @@
"""Initial migration
Revision ID: e5d7de4b3a28
Revises:
Create Date: 2023-10-10 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e5d7de4b3a28'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Create tomato_images table
op.create_table('tomato_images',
sa.Column('id', sa.String(length=36), primary_key=True, index=True),
sa.Column('filename', sa.String(length=255), nullable=False),
sa.Column('file_path', sa.String(length=512), nullable=False, unique=True),
sa.Column('file_size', sa.Integer(), nullable=False),
sa.Column('mime_type', sa.String(length=50), nullable=False),
sa.Column('width', sa.Integer(), nullable=True),
sa.Column('height', sa.Integer(), nullable=True),
sa.Column('uploaded_at', sa.DateTime(), nullable=False),
)
# Create analysis_results table
op.create_table('analysis_results',
sa.Column('id', sa.String(length=36), primary_key=True, index=True),
sa.Column('image_id', sa.String(length=36), sa.ForeignKey('tomato_images.id', ondelete='CASCADE'), nullable=False),
sa.Column('model_name', sa.String(length=255), nullable=False),
sa.Column('model_version', sa.String(length=50), nullable=False),
sa.Column('primary_severity', sa.String(length=50), nullable=True),
sa.Column('severity_confidence', sa.Float(), nullable=True),
sa.Column('segmentation_data', sa.Text(), nullable=True),
sa.Column('processed_at', sa.DateTime(), nullable=False),
sa.Column('processing_time_ms', sa.Integer(), nullable=True),
)
# Create severity_details table
op.create_table('severity_details',
sa.Column('id', sa.String(length=36), primary_key=True, index=True),
sa.Column('analysis_id', sa.String(length=36), sa.ForeignKey('analysis_results.id', ondelete='CASCADE'), nullable=False),
sa.Column('severity_class', sa.String(length=50), nullable=False),
sa.Column('confidence', sa.Float(), nullable=False),
sa.Column('affected_area_percentage', sa.Float(), nullable=True),
)
def downgrade():
op.drop_table('severity_details')
op.drop_table('analysis_results')
op.drop_table('tomato_images')

26
requirements.txt Normal file
View File

@ -0,0 +1,26 @@
# FastAPI and server
fastapi>=0.95.0
uvicorn>=0.21.1
python-multipart>=0.0.6
pydantic>=2.0.0
pydantic-settings>=2.0.0
# Database
sqlalchemy>=2.0.0
alembic>=1.10.0
# Image processing and ML
opencv-python-headless>=4.5.0
numpy>=1.20.0
scikit-image>=0.19.0
scikit-learn>=1.0.0
torch>=2.0.0
torchvision>=0.15.0
# Utils
python-dotenv>=1.0.0
httpx>=0.24.0
# Code quality
ruff>=0.1.0
black>=23.0.0