151 lines
4.4 KiB
Python
151 lines
4.4 KiB
Python
import time
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import settings
|
|
from app.models.scrape_job import ScrapeJob, JobStatus
|
|
from app.models.scrape_result import ScrapeResult
|
|
|
|
|
|
class Scraper:
|
|
"""
|
|
Service for web scraping.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
db: Session,
|
|
user_agent: Optional[str] = None,
|
|
timeout: Optional[int] = None,
|
|
rate_limit: Optional[float] = None,
|
|
):
|
|
self.db = db
|
|
self.user_agent = user_agent or settings.DEFAULT_USER_AGENT
|
|
self.timeout = timeout or settings.DEFAULT_TIMEOUT
|
|
self.rate_limit = rate_limit or settings.DEFAULT_RATE_LIMIT
|
|
self._last_request_time = 0
|
|
|
|
def _respect_rate_limit(self) -> None:
|
|
"""
|
|
Respect rate limit by sleeping if necessary.
|
|
"""
|
|
current_time = time.time()
|
|
time_since_last_request = current_time - self._last_request_time
|
|
|
|
if time_since_last_request < (1.0 / self.rate_limit):
|
|
sleep_time = (1.0 / self.rate_limit) - time_since_last_request
|
|
time.sleep(sleep_time)
|
|
|
|
self._last_request_time = time.time()
|
|
|
|
def fetch_url(self, url: str) -> requests.Response:
|
|
"""
|
|
Fetch URL respecting rate limits.
|
|
"""
|
|
self._respect_rate_limit()
|
|
|
|
headers = {
|
|
"User-Agent": self.user_agent,
|
|
}
|
|
|
|
response = requests.get(
|
|
url,
|
|
headers=headers,
|
|
timeout=self.timeout,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
return response
|
|
|
|
def parse_html(self, html: str, selector: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Parse HTML content.
|
|
"""
|
|
soup = BeautifulSoup(html, "lxml")
|
|
result = {
|
|
"title": soup.title.text if soup.title else None,
|
|
"meta_description": None,
|
|
"h1": [h1.text.strip() for h1 in soup.find_all("h1")],
|
|
"links": [
|
|
{"href": a.get("href"), "text": a.text.strip()}
|
|
for a in soup.find_all("a")
|
|
if a.get("href")
|
|
],
|
|
}
|
|
|
|
# Extract meta description
|
|
meta_desc = soup.find("meta", attrs={"name": "description"})
|
|
if meta_desc:
|
|
result["meta_description"] = meta_desc.get("content")
|
|
|
|
# If a selector is provided, extract content matching the selector
|
|
if selector:
|
|
selected_elements = soup.select(selector)
|
|
result["selected_content"] = [
|
|
element.text.strip() for element in selected_elements
|
|
]
|
|
result["selected_html"] = [str(element) for element in selected_elements]
|
|
|
|
return result
|
|
|
|
def run_job(self, job_id: int) -> ScrapeJob:
|
|
"""
|
|
Run a scraping job.
|
|
"""
|
|
# Get job from DB
|
|
job = self.db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
|
|
if not job:
|
|
raise ValueError(f"Job with ID {job_id} not found")
|
|
|
|
# Update job status
|
|
job.status = JobStatus.IN_PROGRESS
|
|
job.started_at = datetime.now()
|
|
self.db.commit()
|
|
self.db.refresh(job)
|
|
|
|
try:
|
|
# Fetch URL
|
|
response = self.fetch_url(job.url)
|
|
|
|
# Create ScrapeResult
|
|
result = ScrapeResult(
|
|
job_id=job.id,
|
|
content_type=response.headers.get("Content-Type"),
|
|
headers=dict(response.headers),
|
|
html_content=response.text,
|
|
)
|
|
self.db.add(result)
|
|
self.db.commit()
|
|
self.db.refresh(result)
|
|
|
|
# Parse HTML
|
|
extracted_data = self.parse_html(response.text, job.selector)
|
|
|
|
# Update ScrapeResult with extracted data
|
|
result.extracted_data = extracted_data
|
|
self.db.commit()
|
|
self.db.refresh(result)
|
|
|
|
# Update job status
|
|
job.status = JobStatus.COMPLETED
|
|
job.completed_at = datetime.now()
|
|
job.result = {"result_id": result.id}
|
|
self.db.commit()
|
|
self.db.refresh(job)
|
|
|
|
return job
|
|
|
|
except Exception as e:
|
|
# Update job with error
|
|
job.status = JobStatus.FAILED
|
|
job.completed_at = datetime.now()
|
|
job.error = str(e)
|
|
self.db.commit()
|
|
self.db.refresh(job)
|
|
|
|
raise e
|