DevOps & Scaling

RabbitMQ + CaptchaAI: Message Queue Integration

RabbitMQ provides guaranteed delivery, message acknowledgment, and sophisticated routing for CAPTCHA solving workloads. This guide builds a production-ready integration.


Why RabbitMQ for CAPTCHA Solving

Feature Benefit
Durable queues Tasks survive broker restarts
Message acknowledgment No lost tasks on worker crash
Dead letter exchange Failed tasks routed for investigation
Priority queues Urgent CAPTCHAs solved first
Routing keys Route by CAPTCHA type to specialized workers

Setup

# Docker
docker run -d --hostname rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  rabbitmq:3-management

# Python client
pip install pika requests

Producer: Submit Tasks

import json
import uuid
import pika


class CaptchaProducer:
    """Submit CAPTCHA tasks to RabbitMQ."""

    def __init__(self, rabbitmq_url="amqp://guest:guest@localhost:5672/"):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url),
        )
        self.channel = self.connection.channel()
        self._setup_queues()

    def _setup_queues(self):
        """Declare durable queues and exchanges."""
        # Dead letter exchange for failed tasks
        self.channel.exchange_declare(
            exchange="captcha.dlx",
            exchange_type="direct",
            durable=True,
        )
        self.channel.queue_declare(
            queue="captcha.failed",
            durable=True,
        )
        self.channel.queue_bind(
            queue="captcha.failed",
            exchange="captcha.dlx",
            routing_key="failed",
        )

        # Main task queue with dead letter routing
        self.channel.queue_declare(
            queue="captcha.tasks",
            durable=True,
            arguments={
                "x-dead-letter-exchange": "captcha.dlx",
                "x-dead-letter-routing-key": "failed",
                "x-message-ttl": 300000,  # 5 min TTL
            },
        )

        # Results queue
        self.channel.queue_declare(
            queue="captcha.results",
            durable=True,
        )

    def submit(self, method, params, priority=0):
        """Submit a CAPTCHA task."""
        task_id = str(uuid.uuid4())[:8]
        task = {
            "id": task_id,
            "method": method,
            "params": params,
        }

        self.channel.basic_publish(
            exchange="",
            routing_key="captcha.tasks",
            body=json.dumps(task),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
                priority=priority,
                message_id=task_id,
            ),
        )
        return task_id

    def close(self):
        self.connection.close()


# Usage
producer = CaptchaProducer()

task_id = producer.submit("userrecaptcha", {
    "googlekey": "SITE_KEY",
    "pageurl": "https://example.com",
}, priority=5)

print(f"Submitted: {task_id}")
producer.close()

Consumer: Worker

import json
import os
import time
import pika
import requests


class CaptchaConsumer:
    """RabbitMQ consumer that solves CAPTCHAs."""

    def __init__(self, api_key, rabbitmq_url="amqp://guest:guest@localhost:5672/"):
        self.api_key = api_key
        self.base = "https://ocr.captchaai.com"
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url),
        )
        self.channel = self.connection.channel()
        # Process one task at a time
        self.channel.basic_qos(prefetch_count=1)

    def start(self):
        """Start consuming tasks."""
        self.channel.basic_consume(
            queue="captcha.tasks",
            on_message_callback=self._handle_task,
        )
        print("Worker started. Waiting for tasks...")
        self.channel.start_consuming()

    def _handle_task(self, ch, method, properties, body):
        """Process a single CAPTCHA task."""
        task = json.loads(body)
        task_id = task["id"]
        print(f"Processing {task_id}...")

        try:
            token = self._solve(task["method"], task["params"])

            # Publish result
            result = {
                "task_id": task_id,
                "status": "success",
                "token": token,
            }
            ch.basic_publish(
                exchange="",
                routing_key="captcha.results",
                body=json.dumps(result),
                properties=pika.BasicProperties(delivery_mode=2),
            )

            # Acknowledge message (remove from queue)
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print(f"{task_id} solved successfully")

        except Exception as e:
            print(f"{task_id} failed: {e}")

            # Reject and send to dead letter queue
            ch.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=False,  # Goes to DLX
            )

    def _solve(self, captcha_method, params, timeout=120):
        resp = requests.post(f"{self.base}/in.php", data={
            "key": self.api_key,
            "method": captcha_method,
            "json": 1,
            **params,
        }, timeout=30)
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")


# Run worker
if __name__ == "__main__":
    consumer = CaptchaConsumer(os.environ["CAPTCHAAI_KEY"])
    consumer.start()

Result Collector

import json
import pika


class ResultCollector:
    """Collect task results from the results queue."""

    def __init__(self, rabbitmq_url="amqp://guest:guest@localhost:5672/"):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url),
        )
        self.channel = self.connection.channel()
        self.results = {}

    def collect(self, expected_count, timeout=120):
        """Collect a specific number of results."""
        deadline = time.time() + timeout

        while len(self.results) < expected_count and time.time() < deadline:
            method, _, body = self.channel.basic_get(
                queue="captcha.results",
                auto_ack=True,
            )
            if body:
                result = json.loads(body)
                self.results[result["task_id"]] = result

            time.sleep(0.5)

        return self.results

Type-Based Routing

Route different CAPTCHA types to specialized workers:

# Setup exchanges and queues
channel.exchange_declare(
    exchange="captcha.types",
    exchange_type="direct",
    durable=True,
)

# Queue per type
for captcha_type in ["recaptcha", "turnstile", "image"]:
    channel.queue_declare(queue=f"captcha.{captcha_type}", durable=True)
    channel.queue_bind(
        queue=f"captcha.{captcha_type}",
        exchange="captcha.types",
        routing_key=captcha_type,
    )


# Submit with routing
def submit_routed(channel, captcha_type, task):
    channel.basic_publish(
        exchange="captcha.types",
        routing_key=captcha_type,
        body=json.dumps(task),
        properties=pika.BasicProperties(delivery_mode=2),
    )

Troubleshooting

Issue Cause Fix
Messages lost on crash Non-durable queue Set durable=True and delivery_mode=2
Worker stuck on one task Long CAPTCHA solve Set prefetch_count=1 per worker
Dead letter queue growing Persistent failures Review failed tasks and fix params
Connection drops Heartbeat timeout Set heartbeat interval, add reconnect logic

FAQ

When should I use RabbitMQ over Redis?

Use RabbitMQ when you need guaranteed delivery, dead letter routing, or type-based message routing. Use Redis for simpler setups with lower latency.

How many consumers should I run?

One consumer per CPU core works well. Each consumer processes one task at a time (prefetch_count=1), so 4 cores = 4 consumers.

Can I retry failed tasks automatically?

Yes. Configure a retry exchange with a TTL delay. Messages rejected by workers get delayed and re-queued automatically.



Reliable queuing — start with CaptchaAI and RabbitMQ.

Discussions (0)

No comments yet.

Related Posts

DevOps & Scaling Blue-Green Deployment for CAPTCHA Solving Infrastructure
Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switching, and rollback strategies with Captcha AI.

Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switchin...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Ansible Playbooks for CaptchaAI Worker Deployment
Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates, and health checks across your server fleet.

Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Rolling Updates for CAPTCHA Solving Worker Fleets
Implement rolling updates for CAPTCHA solving worker fleets — zero-downtime upgrades, graceful draining, health-gated progression, and automatic rollback.

Implement rolling updates for CAPTCHA solving worker fleets — zero-downtime upgrades, graceful draining, healt...

Automation Python All CAPTCHA Types
Feb 28, 2026
DevOps & Scaling AWS Lambda + CaptchaAI: Serverless CAPTCHA Solving
Integrate Captcha AI with AWS Lambda for serverless CAPTCHA solving.

Integrate Captcha AI with AWS Lambda for serverless CAPTCHA solving. Deploy functions, manage API keys with Se...

Automation Python All CAPTCHA Types
Feb 17, 2026
DevOps & Scaling Auto-Scaling CAPTCHA Solving Workers
Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates.

Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates...

Automation Python All CAPTCHA Types
Mar 23, 2026
Tutorials Queue-Based Batch CAPTCHA Processing with Priority Levels
How to build a priority queue system for CAPTCHA solving — assign priority levels to tasks, process high-priority requests first, manage concurrency limits, and...

How to build a priority queue system for CAPTCHA solving — assign priority levels to tasks, process high-prior...

Automation Python All CAPTCHA Types
Mar 20, 2026
DevOps & Scaling Terraform + CaptchaAI: Infrastructure as Code for CAPTCHA Workers
Deploy CAPTCHA solving infrastructure with Terraform — provision cloud workers, configure auto-scaling, manage secrets, and version your Captcha AI setup as cod...

Deploy CAPTCHA solving infrastructure with Terraform — provision cloud workers, configure auto-scaling, manage...

Automation Python All CAPTCHA Types
Mar 15, 2026
DevOps & Scaling Horizontal Scaling CAPTCHA Solving Workers: When and How
Scale CAPTCHA solving horizontally — identify bottlenecks, add workers dynamically, auto-scale based on queue depth, and manage costs with Captcha AI.

Scale CAPTCHA solving horizontally — identify bottlenecks, add workers dynamically, auto-scale based on queue...

Automation Python All CAPTCHA Types
Mar 07, 2026
DevOps & Scaling CaptchaAI Behind a Load Balancer: Architecture Patterns
Architect CAPTCHA solving workers behind a load balancer — routing strategies, health checks, sticky sessions, and scaling patterns with Captcha AI.

Architect CAPTCHA solving workers behind a load balancer — routing strategies, health checks, sticky sessions,...

Automation Python All CAPTCHA Types
Feb 24, 2026
DevOps & Scaling CaptchaAI Monitoring with Datadog: Metrics and Alerts
Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve rate tracking for CAPTCHA solving pipelines.

Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve...

Automation Python All CAPTCHA Types
Feb 19, 2026
DevOps & Scaling NATS Messaging + CaptchaAI: Lightweight CAPTCHA Task Distribution
Use NATS messaging for lightweight, high-performance CAPTCHA task distribution — publish tasks, distribute to workers, and collect results with Captcha AI.

Use NATS messaging for lightweight, high-performance CAPTCHA task distribution — publish tasks, distribute to...

Automation Python All CAPTCHA Types
Jan 24, 2026
DevOps & Scaling Building Event-Driven CAPTCHA Solving with AWS SNS and CaptchaAI
Build an event-driven CAPTCHA solving pipeline using AWS SNS for fan-out notifications and Captcha AI — decouple task submission from result processing.

Build an event-driven CAPTCHA solving pipeline using AWS SNS for fan-out notifications and Captcha AI — decoup...

Automation Python All CAPTCHA Types
Jan 21, 2026
DevOps & Scaling Disaster Recovery Planning for CAPTCHA Solving Pipelines
Build a disaster recovery plan for CAPTCHA solving pipelines — RPO/RTO targets, backup strategies, failover automation, and recovery runbooks with Captcha AI.

Build a disaster recovery plan for CAPTCHA solving pipelines — RPO/RTO targets, backup strategies, failover au...

Automation Python All CAPTCHA Types
Jan 20, 2026