Track changes on web pages protected by CAPTCHAs. Detect updates to pricing, terms, product listings, or any page content — and get notified instantly.
Architecture
Scheduler ──> Page Fetcher ──> CAPTCHA Handler ──> Content Extractor
│
Diff Engine
│
Alert Dispatcher
Content Snapshot Store
# store.py
import hashlib
import json
import os
from datetime import datetime
class SnapshotStore:
def __init__(self, data_dir="snapshots"):
self.data_dir = data_dir
os.makedirs(data_dir, exist_ok=True)
def _key(self, url):
return hashlib.md5(url.encode()).hexdigest()
def save(self, url, content):
path = os.path.join(self.data_dir, f"{self._key(url)}.json")
data = {
"url": url,
"content": content,
"hash": hashlib.sha256(content.encode()).hexdigest(),
"timestamp": datetime.now().isoformat(),
}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def load(self, url):
path = os.path.join(self.data_dir, f"{self._key(url)}.json")
if not os.path.exists(path):
return None
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def has_changed(self, url, new_content):
prev = self.load(url)
if prev is None:
return True # First check
new_hash = hashlib.sha256(new_content.encode()).hexdigest()
return prev["hash"] != new_hash
CAPTCHA-Aware Fetcher
# fetcher.py
import requests
import re
import time
import os
class PageFetcher:
def __init__(self):
self.api_key = os.environ["CAPTCHAAI_API_KEY"]
self.session = requests.Session()
self.session.headers["User-Agent"] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
)
def fetch(self, url):
resp = self.session.get(url, timeout=20)
if self._has_turnstile(resp.text):
resp = self._solve_turnstile(url, resp.text)
elif self._has_recaptcha(resp.text):
resp = self._solve_recaptcha(url, resp.text)
return resp.text
def _has_turnstile(self, html):
return "cf-turnstile" in html or "challenges.cloudflare.com/turnstile" in html
def _has_recaptcha(self, html):
return "data-sitekey" in html
def _solve_turnstile(self, url, html):
match = re.search(r'data-sitekey="([^"]+)"', html)
if not match:
return self.session.get(url, timeout=20)
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key,
"method": "turnstile",
"sitekey": match.group(1),
"pageurl": url,
"json": 1,
}, timeout=30)
task_id = resp.json()["request"]
return self._poll_and_retry(url, task_id)
def _solve_recaptcha(self, url, html):
match = re.search(r'data-sitekey="([^"]+)"', html)
if not match:
return self.session.get(url, timeout=20)
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key,
"method": "userrecaptcha",
"googlekey": match.group(1),
"pageurl": url,
"json": 1,
}, timeout=30)
task_id = resp.json()["request"]
return self._poll_and_retry(url, task_id)
def _poll_and_retry(self, url, task_id):
time.sleep(15)
for _ in range(24):
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data.get("status") == 1:
token = data["request"]
return self.session.post(url, data={
"g-recaptcha-response": token,
"cf-turnstile-response": token,
}, timeout=30)
if data["request"] != "CAPCHA_NOT_READY":
raise RuntimeError(data["request"])
time.sleep(5)
raise TimeoutError("CAPTCHA solve timeout")
Diff Engine
# differ.py
import difflib
def compute_diff(old_content, new_content, context_lines=3):
"""Compute a unified diff between old and new content."""
old_lines = old_content.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
diff = difflib.unified_diff(
old_lines, new_lines,
fromfile="previous", tofile="current",
n=context_lines,
)
return "".join(diff)
def extract_changes(old_content, new_content):
"""Extract added and removed lines."""
old_lines = set(old_content.splitlines())
new_lines = set(new_content.splitlines())
return {
"added": list(new_lines - old_lines),
"removed": list(old_lines - new_lines),
"added_count": len(new_lines - old_lines),
"removed_count": len(old_lines - new_lines),
}
Alert Dispatcher
# alerter.py
import json
import requests
def send_slack_alert(webhook_url, url, changes):
"""Send change alert to Slack."""
text = (
f"*Content Change Detected*\n"
f"URL: {url}\n"
f"Added lines: {changes['added_count']}\n"
f"Removed lines: {changes['removed_count']}\n"
)
if changes["added"]:
text += f"\n*Sample additions:*\n```{chr(10).join(changes['added'][:5])}```"
requests.post(webhook_url, json={"text": text}, timeout=10)
def send_email_alert(to_email, url, diff_text):
"""Send change alert via email (using any SMTP)."""
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(f"Changes detected on {url}:\n\n{diff_text}")
msg["Subject"] = f"Content Change: {url}"
msg["To"] = to_email
msg["From"] = "monitor@example.com"
with smtplib.SMTP("smtp.example.com", 587) as server:
server.starttls()
server.login("monitor@example.com", "YOUR_PASSWORD")
server.send_message(msg)
Content Extractor
# extractor.py
from bs4 import BeautifulSoup
def extract_content(html, selector=None):
"""Extract meaningful content from HTML, stripping boilerplate."""
soup = BeautifulSoup(html, "html.parser")
# Remove scripts, styles, nav, footer
for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
if selector:
target = soup.select_one(selector)
if target:
return target.get_text(separator="\n", strip=True)
# Default: main or body
main = soup.select_one("main, article, .content, #content")
if main:
return main.get_text(separator="\n", strip=True)
return soup.get_text(separator="\n", strip=True)
Main Monitor
# main.py
import time
import os
from fetcher import PageFetcher
from store import SnapshotStore
from extractor import extract_content
from differ import compute_diff, extract_changes
from alerter import send_slack_alert
PAGES = [
{"url": "https://example.com/pricing", "selector": ".pricing-table"},
{"url": "https://example.com/terms", "selector": ".terms-content"},
{"url": "https://example.com/products", "selector": ".product-grid"},
]
SLACK_WEBHOOK = os.environ.get("SLACK_WEBHOOK_URL", "")
def main():
fetcher = PageFetcher()
store = SnapshotStore()
for page in PAGES:
url = page["url"]
selector = page.get("selector")
print(f"Checking: {url}")
try:
html = fetcher.fetch(url)
content = extract_content(html, selector)
if store.has_changed(url, content):
prev = store.load(url)
if prev:
diff = compute_diff(prev["content"], content)
changes = extract_changes(prev["content"], content)
print(f" CHANGED — +{changes['added_count']} / -{changes['removed_count']} lines")
if SLACK_WEBHOOK:
send_slack_alert(SLACK_WEBHOOK, url, changes)
else:
print(f" First snapshot saved")
store.save(url, content)
else:
print(f" No changes")
except Exception as e:
print(f" Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()
FAQ
How often should I run the monitor?
For pricing pages, every 1-4 hours. For terms/legal pages, daily. For product listings, every 6-12 hours. Adjust based on how fast the content changes.
How do I ignore minor layout changes?
Use CSS selectors to target only the meaningful content area. The extractor strips headers, footers, and navigation automatically.
Can I track changes over time?
Yes. Modify SnapshotStore to keep historical snapshots instead of overwriting. Store each version with a timestamp for a full change history.
Related Guides
Never miss a content change — start with CaptchaAI.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.