Scrape structured data from multiple heterogeneous sources, each with different page layouts and CAPTCHA types, and merge into a unified dataset.
Architecture
Source Configs ──> Source Adapters ──> CAPTCHA Router ──> Data Normalizer ──> Unified Store
│ │
└─ Selectors, URL patterns CaptchaAI API
Universal CAPTCHA Router
# captcha_router.py
import requests
import re
import time
import os
class CaptchaRouter:
"""Routes to the correct solve method based on CAPTCHA type detected."""
def __init__(self):
self.api_key = os.environ["CAPTCHAAI_API_KEY"]
def solve_if_present(self, session, url, html):
captcha_type = self._detect_type(html)
if captcha_type == "turnstile":
return self._solve_turnstile(session, url, html)
elif captcha_type == "recaptcha_v2":
return self._solve_recaptcha(session, url, html)
elif captcha_type == "recaptcha_v3":
return self._solve_recaptcha_v3(session, url, html)
else:
return html # No CAPTCHA
def _detect_type(self, html):
if "cf-turnstile" in html or "challenges.cloudflare.com/turnstile" in html:
return "turnstile"
if "recaptcha/api.js?render=" in html:
return "recaptcha_v3"
if "data-sitekey" in html and "g-recaptcha" in html:
return "recaptcha_v2"
return None
def _solve_turnstile(self, session, url, html):
sitekey = self._extract_sitekey(html)
return self._submit_and_poll(session, url, {
"method": "turnstile",
"sitekey": sitekey,
"pageurl": url,
})
def _solve_recaptcha(self, session, url, html):
sitekey = self._extract_sitekey(html)
return self._submit_and_poll(session, url, {
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": url,
})
def _solve_recaptcha_v3(self, session, url, html):
match = re.search(r'recaptcha/api\.js\?render=([^"&]+)', html)
sitekey = match.group(1) if match else self._extract_sitekey(html)
return self._submit_and_poll(session, url, {
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": url,
"version": "v3",
"action": "submit",
"min_score": "0.5",
})
def _extract_sitekey(self, html):
match = re.search(r'data-sitekey="([^"]+)"', html)
if match:
return match.group(1)
match = re.search(r"sitekey['\"]?\s*[:=]\s*['\"]([^'\"]+)", html)
return match.group(1) if match else ""
def _submit_and_poll(self, session, url, params):
params["key"] = self.api_key
params["json"] = 1
resp = requests.post(
"https://ocr.captchaai.com/in.php",
data=params, timeout=30,
)
task_id = resp.json()["request"]
time.sleep(15)
for _ in range(24):
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data.get("status") == 1:
token = data["request"]
post_resp = session.post(url, data={
"g-recaptcha-response": token,
"cf-turnstile-response": token,
}, timeout=30)
return post_resp.text
if data["request"] != "CAPCHA_NOT_READY":
raise RuntimeError(data["request"])
time.sleep(5)
raise TimeoutError("CAPTCHA timeout")
Source Adapter Base
# adapters.py
import requests
from bs4 import BeautifulSoup
from captcha_router import CaptchaRouter
class SourceAdapter:
"""Base adapter for a data source."""
def __init__(self, name, base_url, selectors):
self.name = name
self.base_url = base_url
self.selectors = selectors
self.router = CaptchaRouter()
self.session = requests.Session()
self.session.headers["User-Agent"] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
)
def fetch_page(self, url):
resp = self.session.get(url, timeout=20)
return self.router.solve_if_present(self.session, url, resp.text)
def extract_items(self, html):
soup = BeautifulSoup(html, "html.parser")
cards = soup.select(self.selectors["card"])
items = []
for card in cards:
item = {}
for field, selector in self.selectors["fields"].items():
el = card.select_one(selector)
item[field] = el.get_text(strip=True) if el else ""
item["source"] = self.name
items.append(item)
return items
def scrape(self, query, max_pages=3):
all_items = []
for page in range(1, max_pages + 1):
url = self.base_url.format(query=query.replace(" ", "+"), page=page)
html = self.fetch_page(url)
items = self.extract_items(html)
if not items:
break
all_items.extend(items)
return all_items
Data Normalizer
# normalizer.py
import re
class DataNormalizer:
"""Normalize data from different sources into a unified schema."""
FIELD_MAP = {
"product_name": ["title", "name", "product", "item_name"],
"price": ["price", "cost", "amount"],
"description": ["description", "desc", "summary", "details"],
"category": ["category", "type", "group"],
"url": ["url", "link", "href"],
}
def normalize(self, items):
normalized = []
for item in items:
norm = {"source": item.get("source", "")}
for target, aliases in self.FIELD_MAP.items():
for alias in aliases:
if alias in item and item[alias]:
norm[target] = item[alias]
break
if target not in norm:
norm[target] = ""
norm["price_numeric"] = self._parse_price(norm.get("price", ""))
normalized.append(norm)
return normalized
def _parse_price(self, text):
match = re.search(r'[\d,]+\.?\d*', text.replace(",", ""))
return float(match.group()) if match else None
def deduplicate(self, items, key_fields=("product_name", "source")):
seen = set()
unique = []
for item in items:
key = tuple(item.get(f, "").lower() for f in key_fields)
if key not in seen:
seen.add(key)
unique.append(item)
return unique
Unified Store
# store.py
import csv
import json
from datetime import datetime
class UnifiedStore:
def __init__(self, output_dir="output"):
import os
os.makedirs(output_dir, exist_ok=True)
self.output_dir = output_dir
def save_csv(self, items, filename="aggregated.csv"):
if not items:
return
path = f"{self.output_dir}/{filename}"
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=items[0].keys())
writer.writeheader()
writer.writerows(items)
print(f"Saved {len(items)} items to {path}")
def save_json(self, items, filename="aggregated.json"):
path = f"{self.output_dir}/{filename}"
with open(path, "w", encoding="utf-8") as f:
json.dump({
"items": items,
"count": len(items),
"generated_at": datetime.now().isoformat(),
}, f, indent=2)
print(f"Saved {len(items)} items to {path}")
Main Pipeline
# main.py
import time
from adapters import SourceAdapter
from normalizer import DataNormalizer
from store import UnifiedStore
SOURCES = [
{
"name": "Store Alpha",
"base_url": "https://store-alpha.example.com/search?q={query}&page={page}",
"selectors": {
"card": ".product-card",
"fields": {
"title": "h3.product-title",
"price": ".price",
"category": ".category-tag",
"description": ".product-desc",
},
},
},
{
"name": "Store Beta",
"base_url": "https://store-beta.example.com/find?term={query}&p={page}",
"selectors": {
"card": ".item-listing",
"fields": {
"name": ".item-name",
"cost": ".item-price",
"type": ".item-category",
"summary": ".item-summary",
},
},
},
]
QUERIES = ["wireless headphones", "bluetooth speaker"]
def main():
normalizer = DataNormalizer()
store = UnifiedStore()
all_items = []
for source_config in SOURCES:
adapter = SourceAdapter(
source_config["name"],
source_config["base_url"],
source_config["selectors"],
)
for query in QUERIES:
print(f"Scraping {source_config['name']} for '{query}'...")
raw_items = adapter.scrape(query)
print(f" Found {len(raw_items)} items")
all_items.extend(raw_items)
time.sleep(5)
# Normalize and deduplicate
normalized = normalizer.normalize(all_items)
unique = normalizer.deduplicate(normalized)
print(f"\nTotal: {len(all_items)} raw → {len(normalized)} normalized → {len(unique)} unique")
# Export
store.save_csv(unique)
store.save_json(unique)
if __name__ == "__main__":
main()
FAQ
How do I add a new data source?
Add an entry to SOURCES with the site's URL template, card CSS selector, and field selectors. The normalizer maps varied field names to the unified schema.
What if sources use different CAPTCHA types?
The CaptchaRouter auto-detects reCAPTCHA v2, v3, and Turnstile. Add more detection patterns for GeeTest or BLS if needed.
How do I handle sources with JavaScript rendering?
Replace requests.get() with Selenium or Playwright in the adapter's fetch_page method. The CAPTCHA router works the same way.
Related Guides
Aggregate data from any source — start with CaptchaAI.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.