Reference

CAPTCHA Solving Architecture Patterns for High Volume

Patterns for solving 1,000 to 100,000+ CAPTCHAs per hour reliably.


Pattern 1: Simple Worker Pool

Best for: 100–1,000 solves/hour.

┌──────────┐     ┌──────────────┐     ┌────────────┐
│  Scraper  │────▶│  Thread Pool │────▶│ CaptchaAI  │
│  Tasks    │     │  (5-20       │     │   API      │
│           │◀────│   workers)   │◀────│            │
└──────────┘     └──────────────┘     └────────────┘
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests


class SimpleWorkerPool:
    def __init__(self, api_key, max_workers=10):
        self.api_key = api_key
        self.max_workers = max_workers
        self.base = "https://ocr.captchaai.com"

    def _solve_one(self, params):
        params["key"] = self.api_key
        params["json"] = 1

        resp = requests.post(f"{self.base}/in.php", data=params).json()
        if resp["status"] != 1:
            return {"error": resp["request"]}

        task_id = resp["request"]
        time.sleep(10)

        for _ in range(60):
            result = requests.get(
                f"{self.base}/res.php",
                params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
            ).json()

            if result["request"] == "CAPCHA_NOT_READY":
                time.sleep(5)
                continue
            if result["status"] == 1:
                return {"token": result["request"]}
            return {"error": result["request"]}

        return {"error": "timeout"}

    def solve_batch(self, tasks):
        """tasks: list of (identifier, params) tuples."""
        results = {}
        with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
            futures = {
                pool.submit(self._solve_one, params): ident
                for ident, params in tasks
            }
            for future in as_completed(futures):
                ident = futures[future]
                try:
                    results[ident] = future.result()
                except Exception as e:
                    results[ident] = {"error": str(e)}
        return results

Pattern 2: Queue-Based Pipeline

Best for: 1,000–10,000 solves/hour with back-pressure control.

┌────────┐     ┌───────────┐     ┌──────────┐     ┌───────────┐     ┌────────┐
│Producer│────▶│ Submit    │────▶│ Pending  │────▶│  Poll     │────▶│Results │
│        │     │ Queue     │     │ Queue    │     │  Workers  │     │ Queue  │
└────────┘     └───────────┘     └──────────┘     └───────────┘     └────────┘
import queue
import threading
import time
import requests


class QueuePipeline:
    def __init__(self, api_key, submit_workers=5, poll_workers=10):
        self.api_key = api_key
        self.base = "https://ocr.captchaai.com"
        self.submit_queue = queue.Queue(maxsize=100)
        self.pending_queue = queue.Queue()
        self.results = {}
        self.results_lock = threading.Lock()
        self._running = False
        self.submit_workers = submit_workers
        self.poll_workers = poll_workers

    def start(self):
        self._running = True
        for _ in range(self.submit_workers):
            threading.Thread(target=self._submit_worker, daemon=True).start()
        for _ in range(self.poll_workers):
            threading.Thread(target=self._poll_worker, daemon=True).start()

    def stop(self):
        self._running = False

    def add(self, ident, params):
        self.submit_queue.put((ident, params))

    def get_result(self, ident, timeout=300):
        deadline = time.time() + timeout
        while time.time() < deadline:
            with self.results_lock:
                if ident in self.results:
                    return self.results.pop(ident)
            time.sleep(1)
        return {"error": "timeout"}

    def _submit_worker(self):
        while self._running:
            try:
                ident, params = self.submit_queue.get(timeout=1)
            except queue.Empty:
                continue
            params["key"] = self.api_key
            params["json"] = 1
            try:
                resp = requests.post(f"{self.base}/in.php", data=params).json()
                if resp["status"] == 1:
                    self.pending_queue.put((ident, resp["request"], time.time()))
                else:
                    with self.results_lock:
                        self.results[ident] = {"error": resp["request"]}
            except Exception as e:
                with self.results_lock:
                    self.results[ident] = {"error": str(e)}

    def _poll_worker(self):
        while self._running:
            try:
                ident, task_id, submitted_at = self.pending_queue.get(timeout=1)
            except queue.Empty:
                continue

            # Wait at least 10s from submission
            wait = 10 - (time.time() - submitted_at)
            if wait > 0:
                time.sleep(wait)

            try:
                resp = requests.get(
                    f"{self.base}/res.php",
                    params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
                ).json()

                if resp["request"] == "CAPCHA_NOT_READY":
                    self.pending_queue.put((ident, task_id, submitted_at))
                    time.sleep(3)
                elif resp["status"] == 1:
                    with self.results_lock:
                        self.results[ident] = {"token": resp["request"]}
                else:
                    with self.results_lock:
                        self.results[ident] = {"error": resp["request"]}
            except Exception:
                self.pending_queue.put((ident, task_id, submitted_at))
                time.sleep(5)

Usage:

pipeline = QueuePipeline("YOUR_API_KEY")
pipeline.start()

# Add CAPTCHAs
pipeline.add("page_1", {"method": "turnstile", "sitekey": "KEY", "pageurl": "URL"})
pipeline.add("page_2", {"method": "userrecaptcha", "googlekey": "KEY", "pageurl": "URL"})

# Get results
result1 = pipeline.get_result("page_1")
result2 = pipeline.get_result("page_2")

pipeline.stop()

Pattern 3: Circuit Breaker

Prevents cascading failures when the solving API is degraded.

import time
import threading


class CircuitBreaker:
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing — reject requests
    HALF_OPEN = "half_open"  # Testing recovery

    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.state = self.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.lock = threading.Lock()

    def can_proceed(self):
        with self.lock:
            if self.state == self.CLOSED:
                return True
            if self.state == self.OPEN:
                if time.time() - self.last_failure_time > self.reset_timeout:
                    self.state = self.HALF_OPEN
                    return True
                return False
            # HALF_OPEN: allow one request
            return True

    def record_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = self.CLOSED

    def record_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = self.OPEN


class ResilientSolver:
    def __init__(self, api_key):
        self.api_key = api_key
        self.breaker = CircuitBreaker(failure_threshold=5, reset_timeout=60)

    def solve(self, params):
        if not self.breaker.can_proceed():
            raise Exception("Circuit open — API degraded, try later")

        try:
            result = self._do_solve(params)
            self.breaker.record_success()
            return result
        except Exception as e:
            self.breaker.record_failure()
            raise

    def _do_solve(self, params):
        # Standard solve logic
        pass

Pattern 4: Pre-Solving with Token Buffer

Maintain a buffer of pre-solved tokens for instant use.

import queue
import threading
import time


class TokenBuffer:
    def __init__(self, solver, params, buffer_size=5, ttl_seconds=90):
        self.solver = solver
        self.params = params
        self.buffer = queue.Queue(maxsize=buffer_size)
        self.ttl = ttl_seconds
        self.buffer_size = buffer_size
        self._running = False

    def start(self):
        self._running = True
        threading.Thread(target=self._fill_loop, daemon=True).start()

    def stop(self):
        self._running = False

    def get_token(self, timeout=30):
        """Get a pre-solved token. Returns None if buffer empty."""
        try:
            token, created_at = self.buffer.get(timeout=timeout)
            if time.time() - created_at > self.ttl:
                # Token expired, try next
                return self.get_token(timeout=timeout)
            return token
        except queue.Empty:
            return None

    def _fill_loop(self):
        while self._running:
            if self.buffer.qsize() < self.buffer_size:
                try:
                    token = self.solver.solve(self.params)
                    self.buffer.put((token, time.time()))
                except Exception:
                    time.sleep(5)
            else:
                time.sleep(2)

Pattern 5: Multi-Provider Failover

Route to backup providers when primary fails.

class MultiProviderSolver:
    def __init__(self, providers):
        """providers: list of (name, solver_instance, priority) tuples."""
        self.providers = sorted(providers, key=lambda x: x[2])
        self.breakers = {name: CircuitBreaker() for name, _, _ in providers}

    def solve(self, params):
        errors = []
        for name, solver, _ in self.providers:
            if not self.breakers[name].can_proceed():
                continue
            try:
                result = solver.solve(params)
                self.breakers[name].record_success()
                return result
            except Exception as e:
                self.breakers[name].record_failure()
                errors.append(f"{name}: {e}")
        raise Exception(f"All providers failed: {'; '.join(errors)}")

Scaling Guidelines

Volume Architecture Workers Notes
< 100/hr Direct calls 1-3 No special architecture needed
100-1K/hr Worker pool 5-10 Pattern 1
1K-10K/hr Queue pipeline 10-30 Pattern 2 + circuit breaker
10K-50K/hr Distributed queues 30-100 Redis/RabbitMQ, multiple machines
50K+/hr Multi-provider 100+ Pattern 5 + distributed queue

Monitoring at Scale

import logging
from collections import defaultdict

logger = logging.getLogger("captcha_scale")


class ScaleMetrics:
    def __init__(self):
        self.counts = defaultdict(int)
        self.times = defaultdict(list)

    def record(self, captcha_type, success, elapsed):
        key = f"{captcha_type}_{'ok' if success else 'fail'}"
        self.counts[key] += 1
        self.times[captcha_type].append(elapsed)

    def report(self):
        for captcha_type in set(k.rsplit("_", 1)[0] for k in self.counts):
            ok = self.counts.get(f"{captcha_type}_ok", 0)
            fail = self.counts.get(f"{captcha_type}_fail", 0)
            total = ok + fail
            rate = (ok / total * 100) if total else 0
            times = self.times.get(captcha_type, [])
            avg_time = sum(times) / len(times) if times else 0
            logger.info(
                f"{captcha_type}: {total} solves, {rate:.1f}% success, {avg_time:.1f}s avg"
            )

FAQ

How many concurrent solves can CaptchaAI handle?

CaptchaAI can handle high concurrency. Start with 10-20 concurrent requests and increase based on your results.

Should I use async or threads?

Threads work well since CAPTCHA solving is I/O-bound (network requests + waiting). For Python 3.7+, asyncio with aiohttp is also excellent for very high concurrency.

How do I prevent overwhelming the API?

Use a queue with bounded size (back-pressure), rate limiting, and circuit breakers. Monitor your ERROR_NO_SLOT_AVAILABLE rate.



Scale to any volume — start with CaptchaAI.

Discussions (0)

No comments yet.

Related Posts

DevOps & Scaling Ansible Playbooks for CaptchaAI Worker Deployment
Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates, and health checks across your server fleet.

Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Blue-Green Deployment for CAPTCHA Solving Infrastructure
Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switching, and rollback strategies with Captcha AI.

Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switchin...

Automation Python All CAPTCHA Types
Apr 07, 2026
Troubleshooting CaptchaAI API Error Handling: Complete Decision Tree
Complete decision tree for every Captcha AI API error.

Complete decision tree for every Captcha AI API error. Learn which errors are retryable, which need parameter...

Automation Python All CAPTCHA Types
Mar 17, 2026
Tutorials Using Fiddler to Inspect CaptchaAI API Traffic
How to use Fiddler Everywhere and Fiddler Classic to capture, inspect, and debug Captcha AI API requests and responses — filters, breakpoints, and replay for tr...

How to use Fiddler Everywhere and Fiddler Classic to capture, inspect, and debug Captcha AI API requests and r...

Automation Python All CAPTCHA Types
Mar 05, 2026
Tutorials CAPTCHA Handling in Mobile Apps with Appium
Handle CAPTCHAs in mobile app automation using Appium and Captcha AI — extract Web sitekeys, solve, and inject tokens on Android and i OS.

Handle CAPTCHAs in mobile app automation using Appium and Captcha AI — extract Web View sitekeys, solve, and i...

Automation Python All CAPTCHA Types
Feb 13, 2026
Tutorials Streaming Batch Results: Processing CAPTCHA Solutions as They Arrive
Process CAPTCHA solutions the moment they arrive instead of waiting for tasks to complete — use async generators, event emitters, and callback patterns for stre...

Process CAPTCHA solutions the moment they arrive instead of waiting for all tasks to complete — use async gene...

Automation Python All CAPTCHA Types
Apr 07, 2026
Reference CaptchaAI CLI Tool: Command-Line CAPTCHA Solving and Testing
A reference for building and using a Captcha AI command-line tool — solve CAPTCHAs, check balance, test parameters, and integrate with shell scripts and CI/CD p...

A reference for building and using a Captcha AI command-line tool — solve CAPTCHAs, check balance, test parame...

Automation Python All CAPTCHA Types
Feb 26, 2026
DevOps & Scaling Auto-Scaling CAPTCHA Solving Workers
Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates.

Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates...

Automation Python All CAPTCHA Types
Mar 23, 2026
DevOps & Scaling CaptchaAI Monitoring with Datadog: Metrics and Alerts
Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve rate tracking for CAPTCHA solving pipelines.

Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve...

Automation Python All CAPTCHA Types
Feb 19, 2026
Reference CAPTCHA Token Injection Methods Reference
Complete reference for injecting solved CAPTCHA tokens into web pages.

Complete reference for injecting solved CAPTCHA tokens into web pages. Covers re CAPTCHA, Turnstile, and Cloud...

Automation Python reCAPTCHA v2
Apr 08, 2026
Reference API Endpoint Mapping: CaptchaAI vs Competitors
Side-by-side API endpoint comparison between Captcha AI, 2 Captcha, Anti-Captcha, and Cap Monster — endpoints, parameters, and response formats.

Side-by-side API endpoint comparison between Captcha AI, 2 Captcha, Anti-Captcha, and Cap Monster — endpoints,...

All CAPTCHA Types Migration
Feb 05, 2026
Reference Browser Session Persistence for CAPTCHA Workflows
Manage browser sessions, cookies, and storage across CAPTCHA-solving runs to reduce repeat challenges and maintain authenticated state.

Manage browser sessions, cookies, and storage across CAPTCHA-solving runs to reduce repeat challenges and main...

Automation Python reCAPTCHA v2
Feb 24, 2026