151 lines
4.4 KiB
Python

import time
from datetime import datetime
from typing import Dict, Any, Optional
import requests
from bs4 import BeautifulSoup
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.scrape_job import ScrapeJob, JobStatus
from app.models.scrape_result import ScrapeResult
class Scraper:
"""
Service for web scraping.
"""
def __init__(
self,
db: Session,
user_agent: Optional[str] = None,
timeout: Optional[int] = None,
rate_limit: Optional[float] = None,
):
self.db = db
self.user_agent = user_agent or settings.DEFAULT_USER_AGENT
self.timeout = timeout or settings.DEFAULT_TIMEOUT
self.rate_limit = rate_limit or settings.DEFAULT_RATE_LIMIT
self._last_request_time = 0
def _respect_rate_limit(self) -> None:
"""
Respect rate limit by sleeping if necessary.
"""
current_time = time.time()
time_since_last_request = current_time - self._last_request_time
if time_since_last_request < (1.0 / self.rate_limit):
sleep_time = (1.0 / self.rate_limit) - time_since_last_request
time.sleep(sleep_time)
self._last_request_time = time.time()
def fetch_url(self, url: str) -> requests.Response:
"""
Fetch URL respecting rate limits.
"""
self._respect_rate_limit()
headers = {
"User-Agent": self.user_agent,
}
response = requests.get(
url,
headers=headers,
timeout=self.timeout,
)
response.raise_for_status()
return response
def parse_html(self, html: str, selector: Optional[str] = None) -> Dict[str, Any]:
"""
Parse HTML content.
"""
soup = BeautifulSoup(html, "lxml")
result = {
"title": soup.title.text if soup.title else None,
"meta_description": None,
"h1": [h1.text.strip() for h1 in soup.find_all("h1")],
"links": [
{"href": a.get("href"), "text": a.text.strip()}
for a in soup.find_all("a")
if a.get("href")
],
}
# Extract meta description
meta_desc = soup.find("meta", attrs={"name": "description"})
if meta_desc:
result["meta_description"] = meta_desc.get("content")
# If a selector is provided, extract content matching the selector
if selector:
selected_elements = soup.select(selector)
result["selected_content"] = [
element.text.strip() for element in selected_elements
]
result["selected_html"] = [str(element) for element in selected_elements]
return result
def run_job(self, job_id: int) -> ScrapeJob:
"""
Run a scraping job.
"""
# Get job from DB
job = self.db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
if not job:
raise ValueError(f"Job with ID {job_id} not found")
# Update job status
job.status = JobStatus.IN_PROGRESS
job.started_at = datetime.now()
self.db.commit()
self.db.refresh(job)
try:
# Fetch URL
response = self.fetch_url(job.url)
# Create ScrapeResult
result = ScrapeResult(
job_id=job.id,
content_type=response.headers.get("Content-Type"),
headers=dict(response.headers),
html_content=response.text,
)
self.db.add(result)
self.db.commit()
self.db.refresh(result)
# Parse HTML
extracted_data = self.parse_html(response.text, job.selector)
# Update ScrapeResult with extracted data
result.extracted_data = extracted_data
self.db.commit()
self.db.refresh(result)
# Update job status
job.status = JobStatus.COMPLETED
job.completed_at = datetime.now()
job.result = {"result_id": result.id}
self.db.commit()
self.db.refresh(job)
return job
except Exception as e:
# Update job with error
job.status = JobStatus.FAILED
job.completed_at = datetime.now()
job.error = str(e)
self.db.commit()
self.db.refresh(job)
raise e