Scrape competitor pricing, product listings, and feature pages. Store historical data and generate comparison reports.
Architecture
Competitor Sites ──> CAPTCHA Solver ──> Data Extractors
│
SQLite Store
│
Dashboard Report
Data Models
# models.py
import sqlite3
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
@dataclass
class CompetitorData:
competitor: str
metric: str
value: str
numeric_value: Optional[float] = None
url: str = ""
scraped_at: str = ""
def __post_init__(self):
if not self.scraped_at:
self.scraped_at = datetime.now().isoformat()
class CompetitorDB:
def __init__(self, path="competitor_data.db"):
self.conn = sqlite3.connect(path)
self._init()
def _init(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
competitor TEXT,
metric TEXT,
value TEXT,
numeric_value REAL,
url TEXT,
scraped_at TEXT
)
""")
self.conn.commit()
def save(self, data: CompetitorData):
self.conn.execute(
"""INSERT INTO metrics
(competitor, metric, value, numeric_value, url, scraped_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(data.competitor, data.metric, data.value,
data.numeric_value, data.url, data.scraped_at),
)
self.conn.commit()
def get_history(self, competitor, metric, limit=30):
cursor = self.conn.execute(
"""SELECT value, numeric_value, scraped_at
FROM metrics
WHERE competitor = ? AND metric = ?
ORDER BY scraped_at DESC LIMIT ?""",
(competitor, metric, limit),
)
return cursor.fetchall()
def latest_comparison(self, metric):
cursor = self.conn.execute(
"""SELECT competitor, value, numeric_value, MAX(scraped_at) as latest
FROM metrics WHERE metric = ?
GROUP BY competitor ORDER BY numeric_value""",
(metric,),
)
return cursor.fetchall()
CAPTCHA Solver
# solver.py
import requests
import time
import re
import os
class CaptchaSolver:
def __init__(self):
self.api_key = os.environ["CAPTCHAAI_API_KEY"]
def solve_if_needed(self, session, url, html):
if "data-sitekey" not in html:
return html
match = re.search(r'data-sitekey="([^"]+)"', html)
if not match:
return html
sitekey = match.group(1)
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": url,
"json": 1,
}, timeout=30)
task_id = resp.json()["request"]
time.sleep(15)
for _ in range(24):
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data.get("status") == 1:
post_resp = session.post(url, data={
"g-recaptcha-response": data["request"],
}, timeout=30)
return post_resp.text
if data["request"] != "CAPCHA_NOT_READY":
raise RuntimeError(data["request"])
time.sleep(5)
raise TimeoutError("CAPTCHA solve timeout")
Competitor Scraper
# scraper.py
import requests
import re
from bs4 import BeautifulSoup
from solver import CaptchaSolver
from models import CompetitorData
class CompetitorScraper:
def __init__(self):
self.solver = CaptchaSolver()
self.session = requests.Session()
self.session.headers["User-Agent"] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
)
def scrape_pricing(self, competitor_name, url, plan_selector, price_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
plans = soup.select(plan_selector)
data = []
for plan in plans:
name_el = plan.select_one("h3, h2, .plan-name")
price_el = plan.select_one(price_selector)
if not name_el or not price_el:
continue
price_text = price_el.get_text(strip=True)
match = re.search(r'[\d,.]+', price_text)
numeric = float(match.group().replace(",", "")) if match else None
data.append(CompetitorData(
competitor=competitor_name,
metric=f"price_{name_el.get_text(strip=True).lower().replace(' ', '_')}",
value=price_text,
numeric_value=numeric,
url=url,
))
return data
def scrape_features(self, competitor_name, url, feature_list_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
features = soup.select(f"{feature_list_selector} li")
return [
CompetitorData(
competitor=competitor_name,
metric="feature",
value=f.get_text(strip=True),
url=url,
)
for f in features if f.get_text(strip=True)
]
def scrape_product_count(self, competitor_name, url, count_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
el = soup.select_one(count_selector)
if el:
text = el.get_text(strip=True)
match = re.search(r'[\d,]+', text)
if match:
count = int(match.group().replace(",", ""))
return CompetitorData(
competitor=competitor_name,
metric="product_count",
value=text,
numeric_value=count,
url=url,
)
return None
def _fetch(self, url):
resp = self.session.get(url, timeout=20)
return self.solver.solve_if_needed(self.session, url, resp.text)
Report Generator
# report.py
from models import CompetitorDB
def generate_report(db: CompetitorDB, metrics):
lines = ["=" * 60, "Competitor Analysis Report", "=" * 60, ""]
for metric in metrics:
results = db.latest_comparison(metric)
if not results:
continue
lines.append(f"--- {metric.replace('_', ' ').title()} ---")
for comp, value, numeric, ts in results:
marker = ""
if numeric is not None:
marker = f" (${numeric:,.2f})" if "price" in metric else f" ({numeric:,.0f})"
lines.append(f" {comp}: {value}{marker}")
lines.append("")
return "\n".join(lines)
def generate_trend(db: CompetitorDB, competitor, metric, periods=10):
history = db.get_history(competitor, metric, limit=periods)
if not history:
return f"No data for {competitor} — {metric}"
lines = [f"Trend: {competitor} — {metric}", "-" * 40]
for value, numeric, ts in reversed(history):
date = ts[:10]
lines.append(f" {date}: {value}")
return "\n".join(lines)
Main Runner
# main.py
import time
from models import CompetitorDB
from scraper import CompetitorScraper
from report import generate_report
COMPETITORS = [
{
"name": "Competitor A",
"pricing_url": "https://competitor-a.example.com/pricing",
"plan_selector": ".pricing-plan",
"price_selector": ".price",
},
{
"name": "Competitor B",
"pricing_url": "https://competitor-b.example.com/pricing",
"plan_selector": ".plan-card",
"price_selector": ".plan-price",
},
]
def main():
db = CompetitorDB()
scraper = CompetitorScraper()
for comp in COMPETITORS:
print(f"Scraping {comp['name']}...")
try:
pricing = scraper.scrape_pricing(
comp["name"], comp["pricing_url"],
comp["plan_selector"], comp["price_selector"],
)
for p in pricing:
db.save(p)
print(f" {p.metric}: {p.value}")
except Exception as e:
print(f" Error: {e}")
time.sleep(5)
# Generate report
metrics = ["price_basic", "price_pro", "price_enterprise", "product_count"]
report = generate_report(db, metrics)
print(report)
with open("competitor_report.txt", "w") as f:
f.write(report)
if __name__ == "__main__":
main()
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Prices not extracted | Selector mismatch | Inspect page HTML and update selectors per competitor |
| Historical data missing | First run | Data accumulates; run daily for trend visibility |
| CAPTCHA on pricing page | Bot detection | Add delays and use session cookies |
| Report shows stale data | Same entry re-inserted | Use latest_comparison which groups by MAX date |
FAQ
How do I visualize trends?
Export data from SQLite and plot with matplotlib, or pipe the CSV output into Google Sheets for built-in charting.
Can I track non-pricing metrics?
Yes. Use scrape_features for feature lists or scrape_product_count for catalog sizes. Add custom scrapers for any metric.
How do I get alerts on price changes?
Compare today's scraped prices with yesterday's stored values and send alerts (Slack/email) when the difference exceeds a threshold.
Related Guides
Track competitors at scale — start with CaptchaAI.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.