import time from datetime import datetime from typing import Dict, Any, Optional import requests from bs4 import BeautifulSoup from sqlalchemy.orm import Session from app.core.config import settings from app.models.scrape_job import ScrapeJob, JobStatus from app.models.scrape_result import ScrapeResult class Scraper: """ Service for web scraping. """ def __init__( self, db: Session, user_agent: Optional[str] = None, timeout: Optional[int] = None, rate_limit: Optional[float] = None, ): self.db = db self.user_agent = user_agent or settings.DEFAULT_USER_AGENT self.timeout = timeout or settings.DEFAULT_TIMEOUT self.rate_limit = rate_limit or settings.DEFAULT_RATE_LIMIT self._last_request_time = 0 def _respect_rate_limit(self) -> None: """ Respect rate limit by sleeping if necessary. """ current_time = time.time() time_since_last_request = current_time - self._last_request_time if time_since_last_request < (1.0 / self.rate_limit): sleep_time = (1.0 / self.rate_limit) - time_since_last_request time.sleep(sleep_time) self._last_request_time = time.time() def fetch_url(self, url: str) -> requests.Response: """ Fetch URL respecting rate limits. """ self._respect_rate_limit() headers = { "User-Agent": self.user_agent, } response = requests.get( url, headers=headers, timeout=self.timeout, ) response.raise_for_status() return response def parse_html(self, html: str, selector: Optional[str] = None) -> Dict[str, Any]: """ Parse HTML content. """ soup = BeautifulSoup(html, "lxml") result = { "title": soup.title.text if soup.title else None, "meta_description": None, "h1": [h1.text.strip() for h1 in soup.find_all("h1")], "links": [ {"href": a.get("href"), "text": a.text.strip()} for a in soup.find_all("a") if a.get("href") ], } # Extract meta description meta_desc = soup.find("meta", attrs={"name": "description"}) if meta_desc: result["meta_description"] = meta_desc.get("content") # If a selector is provided, extract content matching the selector if selector: selected_elements = soup.select(selector) result["selected_content"] = [ element.text.strip() for element in selected_elements ] result["selected_html"] = [str(element) for element in selected_elements] return result def run_job(self, job_id: int) -> ScrapeJob: """ Run a scraping job. """ # Get job from DB job = self.db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first() if not job: raise ValueError(f"Job with ID {job_id} not found") # Update job status job.status = JobStatus.IN_PROGRESS job.started_at = datetime.now() self.db.commit() self.db.refresh(job) try: # Fetch URL response = self.fetch_url(job.url) # Create ScrapeResult result = ScrapeResult( job_id=job.id, content_type=response.headers.get("Content-Type"), headers=dict(response.headers), html_content=response.text, ) self.db.add(result) self.db.commit() self.db.refresh(result) # Parse HTML extracted_data = self.parse_html(response.text, job.selector) # Update ScrapeResult with extracted data result.extracted_data = extracted_data self.db.commit() self.db.refresh(result) # Update job status job.status = JobStatus.COMPLETED job.completed_at = datetime.now() job.result = {"result_id": result.id} self.db.commit() self.db.refresh(job) return job except Exception as e: # Update job with error job.status = JobStatus.FAILED job.completed_at = datetime.now() job.error = str(e) self.db.commit() self.db.refresh(job) raise e