RabbitMQ provides guaranteed delivery, message acknowledgment, and sophisticated routing for CAPTCHA solving workloads. This guide builds a production-ready integration.
Why RabbitMQ for CAPTCHA Solving
| Feature | Benefit |
|---|---|
| Durable queues | Tasks survive broker restarts |
| Message acknowledgment | No lost tasks on worker crash |
| Dead letter exchange | Failed tasks routed for investigation |
| Priority queues | Urgent CAPTCHAs solved first |
| Routing keys | Route by CAPTCHA type to specialized workers |
Setup
# Docker
docker run -d --hostname rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-management
# Python client
pip install pika requests
Producer: Submit Tasks
import json
import uuid
import pika
class CaptchaProducer:
"""Submit CAPTCHA tasks to RabbitMQ."""
def __init__(self, rabbitmq_url="amqp://guest:guest@localhost:5672/"):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url),
)
self.channel = self.connection.channel()
self._setup_queues()
def _setup_queues(self):
"""Declare durable queues and exchanges."""
# Dead letter exchange for failed tasks
self.channel.exchange_declare(
exchange="captcha.dlx",
exchange_type="direct",
durable=True,
)
self.channel.queue_declare(
queue="captcha.failed",
durable=True,
)
self.channel.queue_bind(
queue="captcha.failed",
exchange="captcha.dlx",
routing_key="failed",
)
# Main task queue with dead letter routing
self.channel.queue_declare(
queue="captcha.tasks",
durable=True,
arguments={
"x-dead-letter-exchange": "captcha.dlx",
"x-dead-letter-routing-key": "failed",
"x-message-ttl": 300000, # 5 min TTL
},
)
# Results queue
self.channel.queue_declare(
queue="captcha.results",
durable=True,
)
def submit(self, method, params, priority=0):
"""Submit a CAPTCHA task."""
task_id = str(uuid.uuid4())[:8]
task = {
"id": task_id,
"method": method,
"params": params,
}
self.channel.basic_publish(
exchange="",
routing_key="captcha.tasks",
body=json.dumps(task),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
priority=priority,
message_id=task_id,
),
)
return task_id
def close(self):
self.connection.close()
# Usage
producer = CaptchaProducer()
task_id = producer.submit("userrecaptcha", {
"googlekey": "SITE_KEY",
"pageurl": "https://example.com",
}, priority=5)
print(f"Submitted: {task_id}")
producer.close()
Consumer: Worker
import json
import os
import time
import pika
import requests
class CaptchaConsumer:
"""RabbitMQ consumer that solves CAPTCHAs."""
def __init__(self, api_key, rabbitmq_url="amqp://guest:guest@localhost:5672/"):
self.api_key = api_key
self.base = "https://ocr.captchaai.com"
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url),
)
self.channel = self.connection.channel()
# Process one task at a time
self.channel.basic_qos(prefetch_count=1)
def start(self):
"""Start consuming tasks."""
self.channel.basic_consume(
queue="captcha.tasks",
on_message_callback=self._handle_task,
)
print("Worker started. Waiting for tasks...")
self.channel.start_consuming()
def _handle_task(self, ch, method, properties, body):
"""Process a single CAPTCHA task."""
task = json.loads(body)
task_id = task["id"]
print(f"Processing {task_id}...")
try:
token = self._solve(task["method"], task["params"])
# Publish result
result = {
"task_id": task_id,
"status": "success",
"token": token,
}
ch.basic_publish(
exchange="",
routing_key="captcha.results",
body=json.dumps(result),
properties=pika.BasicProperties(delivery_mode=2),
)
# Acknowledge message (remove from queue)
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"{task_id} solved successfully")
except Exception as e:
print(f"{task_id} failed: {e}")
# Reject and send to dead letter queue
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False, # Goes to DLX
)
def _solve(self, captcha_method, params, timeout=120):
resp = requests.post(f"{self.base}/in.php", data={
"key": self.api_key,
"method": captcha_method,
"json": 1,
**params,
}, timeout=30)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
# Run worker
if __name__ == "__main__":
consumer = CaptchaConsumer(os.environ["CAPTCHAAI_KEY"])
consumer.start()
Result Collector
import json
import pika
class ResultCollector:
"""Collect task results from the results queue."""
def __init__(self, rabbitmq_url="amqp://guest:guest@localhost:5672/"):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url),
)
self.channel = self.connection.channel()
self.results = {}
def collect(self, expected_count, timeout=120):
"""Collect a specific number of results."""
deadline = time.time() + timeout
while len(self.results) < expected_count and time.time() < deadline:
method, _, body = self.channel.basic_get(
queue="captcha.results",
auto_ack=True,
)
if body:
result = json.loads(body)
self.results[result["task_id"]] = result
time.sleep(0.5)
return self.results
Type-Based Routing
Route different CAPTCHA types to specialized workers:
# Setup exchanges and queues
channel.exchange_declare(
exchange="captcha.types",
exchange_type="direct",
durable=True,
)
# Queue per type
for captcha_type in ["recaptcha", "turnstile", "image"]:
channel.queue_declare(queue=f"captcha.{captcha_type}", durable=True)
channel.queue_bind(
queue=f"captcha.{captcha_type}",
exchange="captcha.types",
routing_key=captcha_type,
)
# Submit with routing
def submit_routed(channel, captcha_type, task):
channel.basic_publish(
exchange="captcha.types",
routing_key=captcha_type,
body=json.dumps(task),
properties=pika.BasicProperties(delivery_mode=2),
)
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Messages lost on crash | Non-durable queue | Set durable=True and delivery_mode=2 |
| Worker stuck on one task | Long CAPTCHA solve | Set prefetch_count=1 per worker |
| Dead letter queue growing | Persistent failures | Review failed tasks and fix params |
| Connection drops | Heartbeat timeout | Set heartbeat interval, add reconnect logic |
FAQ
When should I use RabbitMQ over Redis?
Use RabbitMQ when you need guaranteed delivery, dead letter routing, or type-based message routing. Use Redis for simpler setups with lower latency.
How many consumers should I run?
One consumer per CPU core works well. Each consumer processes one task at a time (prefetch_count=1), so 4 cores = 4 consumers.
Can I retry failed tasks automatically?
Yes. Configure a retry exchange with a TTL delay. Messages rejected by workers get delayed and re-queued automatically.
Related Guides
Reliable queuing — start with CaptchaAI and RabbitMQ.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.