Tutorials

Backpressure Handling in CAPTCHA Solving Queues

When your scraper finds CAPTCHAs faster than CaptchaAI can solve them, an unbounded queue grows until memory runs out. Backpressure is the mechanism that slows down producers when consumers can't keep up.

The Backpressure Problem

Without backpressure:
[Scraper] → 100 tasks/sec → [Queue: ∞] → [Solver: 10/sec] → Memory exhaustion

With backpressure:
[Scraper] → 100 tasks/sec → [Queue: max 50] → [Solver: 10/sec]
                 ↑                                      ↓
                 └──── "slow down" signal ──────────────┘

If your scraper produces tasks at 100/sec but CaptchaAI solves at 10/sec, the queue grows by 90 tasks every second. In 11 seconds, you have 1,000 pending tasks.

Pattern 1: Bounded Queue with Blocking

The simplest approach — a queue with a maximum size. Producers block when the queue is full.

Python

import os
import time
import queue
import threading
import requests

API_KEY = os.environ["CAPTCHAAI_API_KEY"]

# Bounded queue — blocks when full
task_queue = queue.Queue(maxsize=50)
results = {}


def producer(tasks):
    """Scraper adds tasks. Blocks when queue is full."""
    for task in tasks:
        # This blocks if queue has 50 pending items
        task_queue.put(task)
        print(f"Queued: {task['id']} (queue size: {task_queue.qsize()})")


def worker():
    """Consumer — solves CAPTCHAs from the queue."""
    while True:
        task = task_queue.get()
        if task is None:
            break

        try:
            resp = requests.post("https://ocr.captchaai.com/in.php", data={
                "key": API_KEY,
                "method": "userrecaptcha",
                "googlekey": task["sitekey"],
                "pageurl": task["pageurl"],
                "json": 1
            })
            data = resp.json()

            if data.get("status") == 1:
                captcha_id = data["request"]
                solution = poll_result(captcha_id)
                results[task["id"]] = solution
        except Exception as e:
            results[task["id"]] = f"ERROR: {e}"
        finally:
            task_queue.task_done()


def poll_result(captcha_id):
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get", "id": captcha_id, "json": 1
        }).json()
        if result.get("status") == 1:
            return result["request"]
        if result.get("request") != "CAPCHA_NOT_READY":
            return f"ERROR: {result.get('request')}"
    return "ERROR: TIMEOUT"


# Start 5 worker threads
workers = []
for _ in range(5):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    workers.append(t)

# Producer populates queue — naturally slows when queue is full
tasks = [
    {"id": f"t_{i}", "sitekey": "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
     "pageurl": f"https://example.com/{i}"}
    for i in range(200)
]

producer(tasks)
task_queue.join()  # Wait for all tasks to complete

JavaScript

class BoundedQueue {
  constructor(maxSize) {
    this.maxSize = maxSize;
    this.queue = [];
    this.waiters = [];
    this.consumers = [];
  }

  async put(item) {
    while (this.queue.length >= this.maxSize) {
      // Wait until space is available
      await new Promise((resolve) => this.waiters.push(resolve));
    }
    this.queue.push(item);

    // Wake up a waiting consumer
    if (this.consumers.length > 0) {
      this.consumers.shift()();
    }
  }

  async get() {
    while (this.queue.length === 0) {
      await new Promise((resolve) => this.consumers.push(resolve));
    }
    const item = this.queue.shift();

    // Wake up a waiting producer
    if (this.waiters.length > 0) {
      this.waiters.shift()();
    }
    return item;
  }

  get size() {
    return this.queue.length;
  }
}

const taskQueue = new BoundedQueue(50);

Pattern 2: Load Shedding

When the queue is full, drop low-priority tasks instead of blocking.

Python

class LoadSheddingQueue:
    def __init__(self, maxsize, drop_callback=None):
        self.queue = queue.Queue(maxsize=maxsize)
        self.dropped = 0
        self.drop_callback = drop_callback

    def put(self, task, priority="normal"):
        try:
            self.queue.put_nowait(task)
        except queue.Full:
            if priority == "high":
                # Drop oldest normal-priority task to make room
                try:
                    dropped = self.queue.get_nowait()
                    self.queue.put_nowait(task)
                    self.dropped += 1
                    if self.drop_callback:
                        self.drop_callback(dropped)
                except queue.Empty:
                    pass
            else:
                # Drop this task
                self.dropped += 1
                if self.drop_callback:
                    self.drop_callback(task)

    def get(self):
        return self.queue.get()

    @property
    def stats(self):
        return {
            "queued": self.queue.qsize(),
            "dropped": self.dropped
        }


def on_drop(task):
    print(f"DROPPED: {task['id']} (queue full)")

shed_queue = LoadSheddingQueue(maxsize=50, drop_callback=on_drop)

Pattern 3: Adaptive Concurrency

Automatically adjust worker count based on queue depth and error rates:

Python

import threading
import time


class AdaptiveSolver:
    def __init__(self, api_key, min_workers=2, max_workers=20):
        self.api_key = api_key
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.task_queue = queue.Queue(maxsize=100)
        self.active_workers = 0
        self.error_count = 0
        self.success_count = 0
        self.lock = threading.Lock()
        self.workers = []

    def _adjust_workers(self):
        """Scale workers based on queue depth and error rate."""
        while True:
            time.sleep(10)  # Check every 10 seconds

            queue_depth = self.task_queue.qsize()
            total = self.success_count + self.error_count
            error_rate = self.error_count / max(total, 1)

            target_workers = self.active_workers

            if queue_depth > 30 and error_rate < 0.1:
                # Queue building up, errors low — scale up
                target_workers = min(
                    self.active_workers + 2,
                    self.max_workers
                )
            elif queue_depth < 5 or error_rate > 0.2:
                # Queue draining or errors high — scale down
                target_workers = max(
                    self.active_workers - 1,
                    self.min_workers
                )

            if target_workers != self.active_workers:
                self._set_worker_count(target_workers)
                print(f"Adjusted workers: {self.active_workers} → {target_workers} "
                      f"(queue={queue_depth}, err_rate={error_rate:.1%})")

    def _set_worker_count(self, target):
        """Add or remove workers to reach target count."""
        while self.active_workers < target:
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)
            self.active_workers += 1

        # Note: removing workers requires a stop signal mechanism

    def _worker(self):
        while True:
            task = self.task_queue.get()
            try:
                solve_captcha(task)
                with self.lock:
                    self.success_count += 1
            except Exception:
                with self.lock:
                    self.error_count += 1
            finally:
                self.task_queue.task_done()

    def start(self):
        """Start minimum workers + adjuster thread."""
        self._set_worker_count(self.min_workers)
        adjuster = threading.Thread(target=self._adjust_workers, daemon=True)
        adjuster.start()

Pattern 4: Circuit Breaker + Backpressure

Stop submitting entirely when error rates spike:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=30):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failures = 0
        self.last_failure = 0
        self.state = "closed"  # closed=normal, open=blocking, half-open=testing

    def can_proceed(self):
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure > self.reset_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open: allow one request

    def record_success(self):
        self.failures = 0
        self.state = "closed"

    def record_failure(self):
        self.failures += 1
        self.last_failure = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"
            print(f"Circuit OPEN — pausing submissions for {self.reset_timeout}s")

Choosing the Right Pattern

Scenario Best Pattern
Scraper produces bursts, average rate ≤ solve rate Bounded queue with blocking
Scraper consistently produces faster than solve rate Load shedding
Variable load, need auto-scaling Adaptive concurrency
External API having issues Circuit breaker
Production system Combine all patterns

Troubleshooting

Issue Cause Fix
Producer permanently blocked Queue full, workers not consuming Check worker health; increase worker count or queue size
Too many tasks dropped Queue size too small for burst patterns Increase queue maxsize; bursts need buffer capacity
Adaptive scaling oscillating Check interval too short Increase adjustment interval to 30+ seconds; use smoothed metrics
Memory still growing Unbounded result storage Add TTL to results; clean up processed results

FAQ

What queue size should I use?

Set queue size to 2–5× your worker count. With 10 workers, a queue of 20–50 provides buffer without excessive memory usage. Larger buffers absorb bursts but delay backpressure signals.

Should I use backpressure or rate limiting?

Both. Rate limiting controls how fast you send to CaptchaAI. Backpressure controls how much work your scraper can queue. They solve different problems.

How do I monitor backpressure?

Log queue depth periodically. If it stays near max_size, your producers are consistently faster than consumers — add more workers or reduce production rate.

Next Steps

Build resilient CAPTCHA pipelines — get your CaptchaAI API key and implement backpressure from day one.

Related guides:

Discussions (0)

No comments yet.

Related Posts

DevOps & Scaling Blue-Green Deployment for CAPTCHA Solving Infrastructure
Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switching, and rollback strategies with Captcha AI.

Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switchin...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Ansible Playbooks for CaptchaAI Worker Deployment
Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates, and health checks across your server fleet.

Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Horizontal Scaling CAPTCHA Solving Workers: When and How
Scale CAPTCHA solving horizontally — identify bottlenecks, add workers dynamically, auto-scale based on queue depth, and manage costs with Captcha AI.

Scale CAPTCHA solving horizontally — identify bottlenecks, add workers dynamically, auto-scale based on queue...

Automation Python All CAPTCHA Types
Mar 07, 2026
DevOps & Scaling AWS Lambda + CaptchaAI: Serverless CAPTCHA Solving
Integrate Captcha AI with AWS Lambda for serverless CAPTCHA solving.

Integrate Captcha AI with AWS Lambda for serverless CAPTCHA solving. Deploy functions, manage API keys with Se...

Automation Python All CAPTCHA Types
Feb 17, 2026
Tutorials Health Check Endpoints for CAPTCHA Solving Workers
Build health check endpoints for CAPTCHA solving workers — expose liveness, readiness, and dependency checks so orchestrators can detect and replace unhealthy i...

Build health check endpoints for CAPTCHA solving workers — expose liveness, readiness, and dependency checks s...

Automation Python All CAPTCHA Types
Jan 27, 2026
Tutorials Building a CAPTCHA Solving Queue in Python with CaptchaAI
Build a production CAPTCHA solving queue in Python using threading and asyncio.

Build a production CAPTCHA solving queue in Python using threading and asyncio. Process hundreds of CAPTCHAs c...

Automation Python All CAPTCHA Types
Jan 22, 2026
DevOps & Scaling Building Event-Driven CAPTCHA Solving with AWS SNS and CaptchaAI
Build an event-driven CAPTCHA solving pipeline using AWS SNS for fan-out notifications and Captcha AI — decouple task submission from result processing.

Build an event-driven CAPTCHA solving pipeline using AWS SNS for fan-out notifications and Captcha AI — decoup...

Automation Python All CAPTCHA Types
Jan 21, 2026
Tutorials CAPTCHA Session State Management Across Distributed Workers
Manage CAPTCHA session state across distributed workers — cookies, tokens, and browser context between machines using Redis, shared storage, and session seriali...

Manage CAPTCHA session state across distributed workers — cookies, tokens, and browser context between machine...

Automation Python All CAPTCHA Types
Mar 29, 2026
DevOps & Scaling High Availability CAPTCHA Solving: Failover and Redundancy
Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and graceful degradation with Captcha AI.

Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and g...

Automation Python All CAPTCHA Types
Mar 27, 2026
DevOps & Scaling Auto-Scaling CAPTCHA Solving Workers
Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates.

Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates...

Automation Python All CAPTCHA Types
Mar 23, 2026
Tutorials Extracting reCAPTCHA Parameters from Page Source
Extract re CAPTCHA parameters from any web page — sitekey, action, data-s, enterprise flag, and version — using regex, DOM queries, and network interception.

Extract all re CAPTCHA parameters from any web page — sitekey, action, data-s, enterprise flag, and version —...

Python reCAPTCHA v2 Web Scraping
Apr 07, 2026
Tutorials Handling Multiple CAPTCHAs on a Single Page
how to detect and solve multiple CAPTCHAs on a single web page using Captcha AI.

Learn how to detect and solve multiple CAPTCHAs on a single web page using Captcha AI. Covers multi-iframe ext...

Python reCAPTCHA v2 Cloudflare Turnstile
Apr 09, 2026