Redis provides fast, reliable queuing for distributing CAPTCHA tasks across multiple workers. This guide builds a complete producer-consumer system with result tracking and error handling.
Architecture
Producers → Redis List (FIFO queue) → Workers → CaptchaAI API
↓
Redis Hash (results)
↓
Redis Pub/Sub (notifications)
Task Queue Manager
import json
import time
import uuid
import redis
class CaptchaQueue:
"""Redis-backed CAPTCHA task queue."""
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.queue_key = "captcha:tasks"
self.results_key = "captcha:results"
self.notify_channel = "captcha:done"
def submit(self, method, params, priority="normal"):
"""Submit a CAPTCHA task to the queue."""
task_id = str(uuid.uuid4())[:8]
task = {
"id": task_id,
"method": method,
"params": params,
"submitted_at": time.time(),
"priority": priority,
}
if priority == "high":
self.redis.lpush(self.queue_key, json.dumps(task))
else:
self.redis.rpush(self.queue_key, json.dumps(task))
return task_id
def fetch(self, timeout=30):
"""Fetch next task from queue (blocking)."""
result = self.redis.blpop(self.queue_key, timeout=timeout)
if result is None:
return None
_, raw = result
return json.loads(raw)
def store_result(self, task_id, result):
"""Store task result and notify listeners."""
self.redis.hset(
self.results_key,
task_id,
json.dumps(result),
)
# Notify via pub/sub
self.redis.publish(self.notify_channel, task_id)
# Set TTL on result (1 hour)
# Results are in a hash, so we track expiry separately
self.redis.setex(
f"captcha:ttl:{task_id}", 3600, "1",
)
def get_result(self, task_id):
"""Get result for a task (non-blocking)."""
raw = self.redis.hget(self.results_key, task_id)
if raw:
return json.loads(raw)
return None
def wait_result(self, task_id, timeout=120):
"""Wait for a task result via polling."""
start = time.time()
while time.time() - start < timeout:
result = self.get_result(task_id)
if result:
return result
time.sleep(1)
return None
def queue_stats(self):
"""Get queue statistics."""
return {
"pending": self.redis.llen(self.queue_key),
"completed": self.redis.hlen(self.results_key),
}
Worker Process
import os
import time
import requests
class QueueWorker:
"""Worker that processes CAPTCHA tasks from Redis queue."""
def __init__(self, api_key, queue):
self.api_key = api_key
self.queue = queue
self.base = "https://ocr.captchaai.com"
def run(self):
"""Main worker loop."""
worker_id = os.getpid()
print(f"Worker {worker_id} started")
while True:
task = self.queue.fetch(timeout=30)
if task is None:
continue
task_id = task["id"]
print(f"[{worker_id}] Processing {task_id}")
start = time.time()
try:
token = self._solve(task["method"], task["params"])
duration = time.time() - start
self.queue.store_result(task_id, {
"status": "success",
"token": token,
"duration": f"{duration:.1f}s",
})
print(f"[{worker_id}] {task_id} solved in {duration:.1f}s")
except Exception as e:
self.queue.store_result(task_id, {
"status": "error",
"error": str(e),
})
print(f"[{worker_id}] {task_id} failed: {e}")
def _solve(self, method, params, timeout=120):
resp = requests.post(f"{self.base}/in.php", data={
"key": self.api_key,
"method": method,
"json": 1,
**params,
}, timeout=30)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
# Run worker
if __name__ == "__main__":
queue = CaptchaQueue()
worker = QueueWorker(os.environ["CAPTCHAAI_KEY"], queue)
worker.run()
Multi-Worker Launcher
import multiprocessing
import os
def start_workers(num_workers=4):
"""Launch multiple worker processes."""
queue = CaptchaQueue()
processes = []
for i in range(num_workers):
p = multiprocessing.Process(
target=run_worker,
args=(os.environ["CAPTCHAAI_KEY"],),
)
p.start()
processes.append(p)
print(f"Started worker {i + 1}/{num_workers}")
return processes
def run_worker(api_key):
queue = CaptchaQueue()
worker = QueueWorker(api_key, queue)
worker.run()
# Launch
processes = start_workers(num_workers=4)
Producer Example
queue = CaptchaQueue()
# Submit tasks
urls = [
"https://site1.com/login",
"https://site2.com/register",
"https://site3.com/checkout",
]
task_ids = []
for url in urls:
tid = queue.submit("userrecaptcha", {
"googlekey": "SITE_KEY",
"pageurl": url,
})
task_ids.append(tid)
print(f"Submitted {tid} for {url}")
# Wait for all results
for tid in task_ids:
result = queue.wait_result(tid, timeout=120)
status = result["status"] if result else "timeout"
print(f"{tid}: {status}")
# Check queue stats
print(queue.queue_stats())
Pub/Sub Result Listener
Get notified immediately when tasks complete:
import threading
def listen_results(queue):
"""Listen for completed task notifications."""
pubsub = queue.redis.pubsub()
pubsub.subscribe(queue.notify_channel)
for message in pubsub.listen():
if message["type"] == "message":
task_id = message["data"].decode()
result = queue.get_result(task_id)
print(f"Task {task_id} completed: {result['status']}")
# Run listener in background
listener = threading.Thread(
target=listen_results,
args=(CaptchaQueue(),),
daemon=True,
)
listener.start()
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Workers idle with tasks in queue | Connection to wrong Redis | Verify REDIS_URL |
| Results disappear | No TTL management | Use setex for result expiry |
| Queue grows unbounded | Workers too slow | Add more workers or increase concurrency |
| Duplicate processing | Task popped but worker crash | Use brpoplpush for reliable queue |
FAQ
Why Redis over other message queues?
Redis is simple, fast, and most teams already run it. For complex routing or guaranteed delivery, consider RabbitMQ instead.
How many workers per Redis instance?
A single Redis instance handles 100k+ operations/second. The bottleneck is CaptchaAI API throughput, not Redis. Plan workers based on your CaptchaAI capacity.
Should I use Redis Streams instead of Lists?
Redis Streams provide consumer groups and acknowledgment, which is better for production. Lists work well for simpler setups.
Related Guides
Distribute your workload — get CaptchaAI today.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.