import json from typing import Optional import typer from rich.console import Console from rich.table import Table from sqlalchemy.orm import Session from app.db.session import SessionLocal from app.crud import scrape_job, scrape_result from app.models.scrape_job import JobStatus from app.services.scraper import Scraper app = typer.Typer(help="Web Scraper CLI") console = Console() def get_db() -> Session: """ Get a database session. """ return SessionLocal() @app.command("scrape") def scrape_url( url: str = typer.Argument(..., help="URL to scrape"), selector: Optional[str] = typer.Option( None, help="CSS selector to extract content" ), user_agent: Optional[str] = typer.Option( None, help="User agent to use for request" ), timeout: Optional[int] = typer.Option(None, help="Timeout for request in seconds"), output: Optional[str] = typer.Option( None, help="Output file path for results (JSON)" ), ): """ Scrape a URL and extract content. """ console.print(f"Scraping [bold]{url}[/bold]...") db = get_db() try: # Create a new scrape job job_data = { "url": url, "selector": selector, "user_agent": user_agent, "timeout": timeout, } job_in = {k: v for k, v in job_data.items() if v is not None} # Create and run the job job = scrape_job.create(db=db, obj_in=job_in) console.print(f"Created scrape job with ID [bold]{job.id}[/bold]") # Run the job scraper = Scraper(db=db, user_agent=user_agent, timeout=timeout) job = scraper.run_job(job_id=job.id) if job.status == JobStatus.COMPLETED: console.print("[bold green]Scraping completed successfully![/bold green]") # Get the result result = scrape_result.get_latest_by_job_id(db=db, job_id=job.id) # Print basic info console.print("\n[bold]Basic Information:[/bold]") table = Table(show_header=True, header_style="bold") table.add_column("Attribute") table.add_column("Value") if result and result.extracted_data: data = result.extracted_data # Add rows to table if "title" in data: table.add_row("Title", data["title"] or "") if "meta_description" in data: table.add_row("Description", data["meta_description"] or "") if "h1" in data: table.add_row( "H1 Tags", ", ".join(data["h1"]) if data["h1"] else "" ) if "links" in data: link_count = len(data["links"]) if data["links"] else 0 table.add_row("Links", str(link_count)) if selector and "selected_content" in data: content_count = ( len(data["selected_content"]) if data["selected_content"] else 0 ) table.add_row(f"Selected Content ({selector})", str(content_count)) console.print(table) # Write results to file if specified if output: with open(output, "w") as f: json.dump(data, f, indent=2) console.print(f"\nResults saved to [bold]{output}[/bold]") # Ask if user wants to see more details if typer.confirm("\nDo you want to see the full extracted data?"): console.print_json(json.dumps(data)) else: console.print("[yellow]No data extracted.[/yellow]") else: console.print(f"[bold red]Scraping failed:[/bold red] {job.error}") except Exception as e: console.print(f"[bold red]Error:[/bold red] {str(e)}") finally: db.close() @app.command("list") def list_jobs( status: Optional[str] = typer.Option( None, help="Filter by status (pending, in_progress, completed, failed)" ), limit: int = typer.Option(10, help="Limit number of jobs"), ): """ List scrape jobs. """ db = get_db() try: # Get jobs based on status if status: try: job_status = JobStatus(status) jobs = scrape_job.get_by_status(db=db, status=job_status, limit=limit) total = scrape_job.count_by_status(db=db, status=job_status) console.print( f"Found [bold]{total}[/bold] jobs with status [bold]{status}[/bold]" ) except ValueError: console.print(f"[bold red]Invalid status:[/bold red] {status}") return else: jobs = scrape_job.get_multi(db=db, limit=limit) total = scrape_job.count(db=db) console.print(f"Found [bold]{total}[/bold] jobs") if not jobs: console.print("[yellow]No jobs found.[/yellow]") return # Create table table = Table(show_header=True, header_style="bold") table.add_column("ID") table.add_column("URL") table.add_column("Status") table.add_column("Created") table.add_column("Updated") # Add rows for job in jobs: table.add_row( str(job.id), job.url, job.status.value, job.created_at.strftime("%Y-%m-%d %H:%M:%S"), job.updated_at.strftime("%Y-%m-%d %H:%M:%S"), ) console.print(table) except Exception as e: console.print(f"[bold red]Error:[/bold red] {str(e)}") finally: db.close() @app.command("show") def show_job( job_id: int = typer.Argument(..., help="ID of the job to show"), ): """ Show details of a scrape job. """ db = get_db() try: # Get job job = scrape_job.get(db=db, id=job_id) if not job: console.print(f"[bold red]Job not found:[/bold red] {job_id}") return # Print job details console.print(f"\n[bold]Job {job_id}[/bold]") console.print(f"URL: [bold]{job.url}[/bold]") console.print(f"Status: [bold]{job.status.value}[/bold]") console.print(f"Created: [bold]{job.created_at}[/bold]") console.print(f"Updated: [bold]{job.updated_at}[/bold]") if job.started_at: console.print(f"Started: [bold]{job.started_at}[/bold]") if job.completed_at: console.print(f"Completed: [bold]{job.completed_at}[/bold]") if job.selector: console.print(f"Selector: [bold]{job.selector}[/bold]") if job.error: console.print(f"Error: [bold red]{job.error}[/bold red]") # Get results if job is completed if job.status == JobStatus.COMPLETED: result = scrape_result.get_latest_by_job_id(db=db, job_id=job.id) if result and result.extracted_data: console.print("\n[bold]Extracted Data:[/bold]") # Ask if user wants to see the data if typer.confirm("Do you want to see the extracted data?"): console.print_json(json.dumps(result.extracted_data)) else: console.print("[yellow]No data extracted.[/yellow]") except Exception as e: console.print(f"[bold red]Error:[/bold red] {str(e)}") finally: db.close() @app.command("run") def run_job( job_id: int = typer.Argument(..., help="ID of the job to run"), ): """ Run a scrape job. """ db = get_db() try: # Get job job = scrape_job.get(db=db, id=job_id) if not job: console.print(f"[bold red]Job not found:[/bold red] {job_id}") return console.print(f"Running job [bold]{job_id}[/bold]...") # Run the job scraper = Scraper(db=db) job = scraper.run_job(job_id=job.id) if job.status == JobStatus.COMPLETED: console.print("[bold green]Job completed successfully![/bold green]") else: console.print(f"[bold red]Job failed:[/bold red] {job.error}") except Exception as e: console.print(f"[bold red]Error:[/bold red] {str(e)}") finally: db.close() if __name__ == "__main__": app()