Track product or business reviews across platforms, detect sentiment shifts, and receive alerts when negative reviews spike.
Architecture
Scheduler (cron)
└──> ReviewScraper (per source)
└──> CAPTCHA Solver (CaptchaAI)
└──> Sentiment Analyzer
└──> Alerter (email/Slack)
Review Data Model
# models.py
from dataclasses import dataclass
from datetime import datetime
import sqlite3
@dataclass
class Review:
source: str
author: str
rating: float
text: str
date: str
url: str
sentiment: float = 0.0
scraped_at: str = ""
def __post_init__(self):
if not self.scraped_at:
self.scraped_at = datetime.now().isoformat()
class ReviewDB:
def __init__(self, path="reviews.db"):
self.conn = sqlite3.connect(path)
self._init()
def _init(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS reviews (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT, author TEXT, rating REAL,
text TEXT, date TEXT, url TEXT,
sentiment REAL, scraped_at TEXT,
UNIQUE(source, author, date, text)
)
""")
self.conn.commit()
def save(self, review: Review):
try:
self.conn.execute(
"""INSERT OR IGNORE INTO reviews
(source, author, rating, text, date, url, sentiment, scraped_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(review.source, review.author, review.rating,
review.text, review.date, review.url,
review.sentiment, review.scraped_at),
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def recent_sentiment(self, source, days=7):
cursor = self.conn.execute(
"""SELECT AVG(sentiment), COUNT(*)
FROM reviews WHERE source = ?
AND scraped_at >= datetime('now', ?)""",
(source, f"-{days} days"),
)
return cursor.fetchone()
CAPTCHA Solver
# solver.py
import requests
import time
import os
def solve_recaptcha(sitekey, pageurl):
api_key = os.environ["CAPTCHAAI_API_KEY"]
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": api_key,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1,
}, timeout=30)
task_id = resp.json()["request"]
time.sleep(15)
for _ in range(24):
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data.get("status") == 1:
return data["request"]
if data["request"] != "CAPCHA_NOT_READY":
raise RuntimeError(data["request"])
time.sleep(5)
raise TimeoutError("Solve timed out")
Review Scraper
# scraper.py
import requests
import re
from bs4 import BeautifulSoup
from solver import solve_recaptcha
from models import Review
class ReviewScraper:
def __init__(self, source, url_template, selectors):
self.source = source
self.url_template = url_template
self.selectors = selectors
self.session = requests.Session()
self.session.headers["User-Agent"] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
)
def scrape(self, product_id, max_pages=5):
reviews = []
for page in range(1, max_pages + 1):
url = self.url_template.format(product_id=product_id, page=page)
html = self._fetch(url)
page_reviews = self._parse(html, url)
if not page_reviews:
break
reviews.extend(page_reviews)
return reviews
def _fetch(self, url):
resp = self.session.get(url, timeout=20)
if "data-sitekey" in resp.text:
match = re.search(r'data-sitekey="([^"]+)"', resp.text)
if match:
token = solve_recaptcha(match.group(1), url)
resp = self.session.post(url, data={
"g-recaptcha-response": token,
}, timeout=30)
return resp.text
def _parse(self, html, url):
soup = BeautifulSoup(html, "html.parser")
cards = soup.select(self.selectors["card"])
reviews = []
for card in cards:
author_el = card.select_one(self.selectors.get("author", ".author"))
text_el = card.select_one(self.selectors.get("text", ".review-text"))
rating_el = card.select_one(self.selectors.get("rating", ".stars"))
date_el = card.select_one(self.selectors.get("date", ".review-date"))
if not text_el:
continue
rating = self._parse_rating(rating_el)
reviews.append(Review(
source=self.source,
author=author_el.get_text(strip=True) if author_el else "Anonymous",
rating=rating,
text=text_el.get_text(strip=True),
date=date_el.get_text(strip=True) if date_el else "",
url=url,
))
return reviews
def _parse_rating(self, el):
if not el:
return 0.0
# Try aria-label, class-based, or text
label = el.get("aria-label", "")
match = re.search(r'([\d.]+)', label)
if match:
return float(match.group(1))
text = el.get_text(strip=True)
match = re.search(r'([\d.]+)', text)
return float(match.group(1)) if match else 0.0
Sentiment Analyzer
# sentiment.py
def analyze_sentiment(text):
"""Simple keyword-based sentiment scoring (-1.0 to 1.0)."""
positive = [
"great", "excellent", "amazing", "love", "perfect",
"fast", "easy", "reliable", "recommend", "best",
]
negative = [
"terrible", "awful", "slow", "broken", "worst",
"hate", "useless", "scam", "waste", "horrible",
]
words = text.lower().split()
pos = sum(1 for w in words if w in positive)
neg = sum(1 for w in words if w in negative)
total = pos + neg
if total == 0:
return 0.0
return round((pos - neg) / total, 2)
Alert System
# alerter.py
import smtplib
from email.mime.text import MIMEText
def send_alert(subject, body, to_email, smtp_config):
msg = MIMEText(body)
msg["Subject"] = subject
msg["To"] = to_email
msg["From"] = smtp_config["from"]
with smtplib.SMTP(smtp_config["host"], smtp_config["port"]) as server:
server.starttls()
server.login(smtp_config["user"], smtp_config["password"])
server.send_message(msg)
def check_and_alert(db, source, threshold=-0.3, to_email="team@example.com"):
avg_sentiment, count = db.recent_sentiment(source, days=7)
if avg_sentiment is not None and avg_sentiment < threshold:
send_alert(
subject=f"Review Alert: {source} sentiment dropped to {avg_sentiment:.2f}",
body=f"Average sentiment over {count} reviews in the past 7 days: {avg_sentiment:.2f}\n"
f"Threshold: {threshold}",
to_email=to_email,
smtp_config={
"host": "smtp.example.com",
"port": 587,
"user": "alerts@example.com",
"password": "YOUR_SMTP_PASSWORD",
"from": "alerts@example.com",
},
)
Main Runner
# main.py
import time
from models import ReviewDB
from scraper import ReviewScraper
from sentiment import analyze_sentiment
from alerter import check_and_alert
SOURCES = [
{
"source": "Platform A",
"url_template": "https://platform-a.example.com/product/{product_id}/reviews?page={page}",
"selectors": {
"card": ".review-card",
"author": ".reviewer-name",
"text": ".review-body",
"rating": ".star-rating",
"date": ".review-date",
},
"products": ["product-123", "product-456"],
},
]
def main():
db = ReviewDB()
for config in SOURCES:
scraper = ReviewScraper(config["source"], config["url_template"], config["selectors"])
for product_id in config["products"]:
print(f"Scraping {config['source']} — {product_id}")
reviews = scraper.scrape(product_id)
for review in reviews:
review.sentiment = analyze_sentiment(review.text)
is_new = db.save(review)
if is_new:
print(f" New: {review.rating}★ sentiment={review.sentiment}")
time.sleep(5)
check_and_alert(db, config["source"])
print("Done. Check alerts.")
if __name__ == "__main__":
main()
FAQ
How accurate is keyword-based sentiment?
Good enough for trend detection. For production, swap in a model like transformers or a sentiment API for higher accuracy.
Can I monitor reviews hourly?
You can, but most review sites update slowly. Running every 4-12 hours is usually sufficient and reduces CAPTCHA solve costs.
How do I add a new review platform?
Add a new entry in SOURCES with the URL template and CSS selectors matching the platform's review page HTML.
Related Guides
Monitor reviews at scale — start with CaptchaAI.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.