DevOps & Scaling

Redis Queue + CaptchaAI: Distributed CAPTCHA Processing

Redis provides fast, reliable queuing for distributing CAPTCHA tasks across multiple workers. This guide builds a complete producer-consumer system with result tracking and error handling.


Architecture

Producers → Redis List (FIFO queue) → Workers → CaptchaAI API
                                          ↓
                                   Redis Hash (results)
                                          ↓
                                   Redis Pub/Sub (notifications)

Task Queue Manager

import json
import time
import uuid
import redis


class CaptchaQueue:
    """Redis-backed CAPTCHA task queue."""

    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"
        self.notify_channel = "captcha:done"

    def submit(self, method, params, priority="normal"):
        """Submit a CAPTCHA task to the queue."""
        task_id = str(uuid.uuid4())[:8]
        task = {
            "id": task_id,
            "method": method,
            "params": params,
            "submitted_at": time.time(),
            "priority": priority,
        }

        if priority == "high":
            self.redis.lpush(self.queue_key, json.dumps(task))
        else:
            self.redis.rpush(self.queue_key, json.dumps(task))

        return task_id

    def fetch(self, timeout=30):
        """Fetch next task from queue (blocking)."""
        result = self.redis.blpop(self.queue_key, timeout=timeout)
        if result is None:
            return None
        _, raw = result
        return json.loads(raw)

    def store_result(self, task_id, result):
        """Store task result and notify listeners."""
        self.redis.hset(
            self.results_key,
            task_id,
            json.dumps(result),
        )
        # Notify via pub/sub
        self.redis.publish(self.notify_channel, task_id)
        # Set TTL on result (1 hour)
        # Results are in a hash, so we track expiry separately
        self.redis.setex(
            f"captcha:ttl:{task_id}", 3600, "1",
        )

    def get_result(self, task_id):
        """Get result for a task (non-blocking)."""
        raw = self.redis.hget(self.results_key, task_id)
        if raw:
            return json.loads(raw)
        return None

    def wait_result(self, task_id, timeout=120):
        """Wait for a task result via polling."""
        start = time.time()
        while time.time() - start < timeout:
            result = self.get_result(task_id)
            if result:
                return result
            time.sleep(1)
        return None

    def queue_stats(self):
        """Get queue statistics."""
        return {
            "pending": self.redis.llen(self.queue_key),
            "completed": self.redis.hlen(self.results_key),
        }

Worker Process

import os
import time
import requests


class QueueWorker:
    """Worker that processes CAPTCHA tasks from Redis queue."""

    def __init__(self, api_key, queue):
        self.api_key = api_key
        self.queue = queue
        self.base = "https://ocr.captchaai.com"

    def run(self):
        """Main worker loop."""
        worker_id = os.getpid()
        print(f"Worker {worker_id} started")

        while True:
            task = self.queue.fetch(timeout=30)
            if task is None:
                continue

            task_id = task["id"]
            print(f"[{worker_id}] Processing {task_id}")

            start = time.time()
            try:
                token = self._solve(task["method"], task["params"])
                duration = time.time() - start
                self.queue.store_result(task_id, {
                    "status": "success",
                    "token": token,
                    "duration": f"{duration:.1f}s",
                })
                print(f"[{worker_id}] {task_id} solved in {duration:.1f}s")

            except Exception as e:
                self.queue.store_result(task_id, {
                    "status": "error",
                    "error": str(e),
                })
                print(f"[{worker_id}] {task_id} failed: {e}")

    def _solve(self, method, params, timeout=120):
        resp = requests.post(f"{self.base}/in.php", data={
            "key": self.api_key,
            "method": method,
            "json": 1,
            **params,
        }, timeout=30)
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")


# Run worker
if __name__ == "__main__":
    queue = CaptchaQueue()
    worker = QueueWorker(os.environ["CAPTCHAAI_KEY"], queue)
    worker.run()

Multi-Worker Launcher

import multiprocessing
import os


def start_workers(num_workers=4):
    """Launch multiple worker processes."""
    queue = CaptchaQueue()
    processes = []

    for i in range(num_workers):
        p = multiprocessing.Process(
            target=run_worker,
            args=(os.environ["CAPTCHAAI_KEY"],),
        )
        p.start()
        processes.append(p)
        print(f"Started worker {i + 1}/{num_workers}")

    return processes


def run_worker(api_key):
    queue = CaptchaQueue()
    worker = QueueWorker(api_key, queue)
    worker.run()


# Launch
processes = start_workers(num_workers=4)

Producer Example

queue = CaptchaQueue()

# Submit tasks
urls = [
    "https://site1.com/login",
    "https://site2.com/register",
    "https://site3.com/checkout",
]

task_ids = []
for url in urls:
    tid = queue.submit("userrecaptcha", {
        "googlekey": "SITE_KEY",
        "pageurl": url,
    })
    task_ids.append(tid)
    print(f"Submitted {tid} for {url}")

# Wait for all results
for tid in task_ids:
    result = queue.wait_result(tid, timeout=120)
    status = result["status"] if result else "timeout"
    print(f"{tid}: {status}")

# Check queue stats
print(queue.queue_stats())

Pub/Sub Result Listener

Get notified immediately when tasks complete:

import threading


def listen_results(queue):
    """Listen for completed task notifications."""
    pubsub = queue.redis.pubsub()
    pubsub.subscribe(queue.notify_channel)

    for message in pubsub.listen():
        if message["type"] == "message":
            task_id = message["data"].decode()
            result = queue.get_result(task_id)
            print(f"Task {task_id} completed: {result['status']}")


# Run listener in background
listener = threading.Thread(
    target=listen_results,
    args=(CaptchaQueue(),),
    daemon=True,
)
listener.start()

Troubleshooting

Issue Cause Fix
Workers idle with tasks in queue Connection to wrong Redis Verify REDIS_URL
Results disappear No TTL management Use setex for result expiry
Queue grows unbounded Workers too slow Add more workers or increase concurrency
Duplicate processing Task popped but worker crash Use brpoplpush for reliable queue

FAQ

Why Redis over other message queues?

Redis is simple, fast, and most teams already run it. For complex routing or guaranteed delivery, consider RabbitMQ instead.

How many workers per Redis instance?

A single Redis instance handles 100k+ operations/second. The bottleneck is CaptchaAI API throughput, not Redis. Plan workers based on your CaptchaAI capacity.

Should I use Redis Streams instead of Lists?

Redis Streams provide consumer groups and acknowledgment, which is better for production. Lists work well for simpler setups.



Distribute your workload — get CaptchaAI today.

Discussions (0)

No comments yet.

Related Posts

DevOps & Scaling Ansible Playbooks for CaptchaAI Worker Deployment
Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates, and health checks across your server fleet.

Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Blue-Green Deployment for CAPTCHA Solving Infrastructure
Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switching, and rollback strategies with Captcha AI.

Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switchin...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Auto-Scaling CAPTCHA Solving Workers
Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates.

Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates...

Automation Python All CAPTCHA Types
Mar 23, 2026
DevOps & Scaling CaptchaAI Monitoring with Datadog: Metrics and Alerts
Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve rate tracking for CAPTCHA solving pipelines.

Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve...

Automation Python All CAPTCHA Types
Feb 19, 2026
DevOps & Scaling OpenTelemetry Tracing for CAPTCHA Solving Pipelines
Instrument CAPTCHA solving pipelines with Open Telemetry — distributed traces, spans for submit/poll phases, and vendor-neutral observability with Captcha AI.

Instrument CAPTCHA solving pipelines with Open Telemetry — distributed traces, spans for submit/poll phases, a...

Automation Python All CAPTCHA Types
Mar 07, 2026
DevOps & Scaling CaptchaAI Behind a Load Balancer: Architecture Patterns
Architect CAPTCHA solving workers behind a load balancer — routing strategies, health checks, sticky sessions, and scaling patterns with Captcha AI.

Architect CAPTCHA solving workers behind a load balancer — routing strategies, health checks, sticky sessions,...

Automation Python All CAPTCHA Types
Feb 24, 2026
DevOps & Scaling Rolling Updates for CAPTCHA Solving Worker Fleets
Implement rolling updates for CAPTCHA solving worker fleets — zero-downtime upgrades, graceful draining, health-gated progression, and automatic rollback.

Implement rolling updates for CAPTCHA solving worker fleets — zero-downtime upgrades, graceful draining, healt...

Automation Python All CAPTCHA Types
Feb 28, 2026
DevOps & Scaling CaptchaAI Monitoring with New Relic: APM Integration
Integrate Captcha AI with New Relic APM — custom events, transaction tracing, dashboards, and alert policies for CAPTCHA solving performance.

Integrate Captcha AI with New Relic APM — custom events, transaction tracing, dashboards, and alert policies f...

Automation Python All CAPTCHA Types
Jan 31, 2026
DevOps & Scaling High Availability CAPTCHA Solving: Failover and Redundancy
Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and graceful degradation with Captcha AI.

Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and g...

Automation Python All CAPTCHA Types
Mar 27, 2026
DevOps & Scaling GitHub Actions + CaptchaAI: CI/CD CAPTCHA Testing
Integrate Captcha AI with Git Hub Actions for automated CAPTCHA testing in CI/CD pipelines.

Integrate Captcha AI with Git Hub Actions for automated CAPTCHA testing in CI/CD pipelines. Test flows, verify...

Python reCAPTCHA v2 Testing
Feb 04, 2026
DevOps & Scaling Docker + CaptchaAI: Containerized CAPTCHA Solving
Run Captcha AI integrations in Docker containers.

Run Captcha AI integrations in Docker containers. Dockerfile, environment variables, multi-stage builds, and D...

Automation Python All CAPTCHA Types
Mar 09, 2026
DevOps & Scaling Building Custom CaptchaAI Alerts with PagerDuty
Integrate Captcha AI with Pager Duty for incident management — trigger alerts on low balance, high error rates, and pipeline failures with escalation policies.

Integrate Captcha AI with Pager Duty for incident management — trigger alerts on low balance, high error rates...

Automation Python All CAPTCHA Types
Jan 15, 2026