Tutorials

Building a CAPTCHA Solving Queue in Python with CaptchaAI

When scraping at scale, you need more than one-at-a-time CAPTCHA solving. A queue system decouples CAPTCHA submission from result retrieval, enabling parallel solving across hundreds of tasks.


Why use a queue?

Single-request CAPTCHA solving wastes time waiting. A queue system:

  • Submits all CAPTCHAs immediately
  • Polls multiple task IDs in parallel
  • Retries failed solves automatically
  • Controls concurrency to respect API rate limits
  • Provides progress tracking and callbacks

Basic threading queue

import time
import threading
import requests
from queue import Queue, Empty

API_KEY = "YOUR_API_KEY"


class CaptchaQueue:
    """Thread-based CAPTCHA solving queue."""

    def __init__(self, api_key, max_workers=10):
        self.api_key = api_key
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.max_workers = max_workers
        self.workers = []

    def submit(self, method, callback=None, **params):
        """Add a CAPTCHA task to the queue."""
        task = {
            "method": method,
            "params": params,
            "callback": callback,
        }
        self.task_queue.put(task)

    def start(self):
        """Start worker threads."""
        for _ in range(self.max_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)

    def wait(self):
        """Wait for all tasks to complete."""
        self.task_queue.join()

    def get_results(self):
        """Get all available results."""
        results = []
        while not self.result_queue.empty():
            try:
                results.append(self.result_queue.get_nowait())
            except Empty:
                break
        return results

    def _worker(self):
        while True:
            try:
                task = self.task_queue.get(timeout=1)
            except Empty:
                continue

            try:
                result = self._solve(task["method"], **task["params"])
                entry = {"status": "solved", "result": result, "task": task}
                self.result_queue.put(entry)
                if task["callback"]:
                    task["callback"](result)
            except Exception as e:
                entry = {"status": "error", "error": str(e), "task": task}
                self.result_queue.put(entry)
            finally:
                self.task_queue.task_done()

    def _solve(self, method, **params):
        submit = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }, timeout=30).json()

        if submit.get("status") != 1:
            raise Exception(f"Submit error: {submit.get('request')}")

        task_id = submit["request"]
        for _ in range(30):
            time.sleep(5)
            result = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }, timeout=30).json()
            if result.get("status") == 1:
                return result["request"]
            if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
                raise Exception("CAPTCHA unsolvable")
        raise TimeoutError("Solve timed out")


# Usage
queue = CaptchaQueue(API_KEY, max_workers=5)
queue.start()

# Submit multiple CAPTCHAs
urls_and_sitekeys = [
    ("https://example.com/page1", "SITEKEY_1"),
    ("https://example.com/page2", "SITEKEY_2"),
    ("https://example.com/page3", "SITEKEY_3"),
]

for url, sitekey in urls_and_sitekeys:
    queue.submit("userrecaptcha", googlekey=sitekey, pageurl=url)

queue.wait()
results = queue.get_results()
print(f"Solved {len(results)} CAPTCHAs")
for r in results:
    print(f"  {r['status']}: {r.get('result', r.get('error', ''))[:50]}")

Async queue with asyncio

import asyncio
import aiohttp

API_KEY = "YOUR_API_KEY"


class AsyncCaptchaQueue:
    """Async CAPTCHA solving queue with concurrency control."""

    def __init__(self, api_key, max_concurrent=10):
        self.api_key = api_key
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []

    async def solve_batch(self, tasks):
        """Solve a batch of CAPTCHA tasks concurrently."""
        coros = [self._solve_task(task) for task in tasks]
        self.results = await asyncio.gather(*coros, return_exceptions=True)
        return self.results

    async def _solve_task(self, task):
        async with self.semaphore:
            return await self._solve(task["method"], **task["params"])

    async def _solve(self, method, **params):
        async with aiohttp.ClientSession() as session:
            # Submit
            async with session.post("https://ocr.captchaai.com/in.php", data={
                "key": self.api_key, "method": method, "json": 1, **params,
            }) as resp:
                data = await resp.json(content_type=None)
                if data.get("status") != 1:
                    raise Exception(f"Submit error: {data.get('request')}")
                task_id = data["request"]

            # Poll
            for _ in range(30):
                await asyncio.sleep(5)
                async with session.get("https://ocr.captchaai.com/res.php", params={
                    "key": self.api_key, "action": "get", "id": task_id, "json": 1,
                }) as resp:
                    result = await resp.json(content_type=None)
                    if result.get("status") == 1:
                        return result["request"]
                    if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
                        raise Exception("CAPTCHA unsolvable")

            raise TimeoutError("Solve timed out")


# Usage
async def main():
    queue = AsyncCaptchaQueue(API_KEY, max_concurrent=5)

    tasks = [
        {"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
        for i in range(10)
    ]

    results = await queue.solve_batch(tasks)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i}: ERROR — {result}")
        else:
            print(f"Task {i}: {result[:50]}...")


asyncio.run(main())

Producer-consumer pattern

For continuous scraping workloads where pages are discovered dynamically:

import asyncio
import aiohttp

API_KEY = "YOUR_API_KEY"


class ProducerConsumerQueue:
    """Continuous CAPTCHA solving with producer-consumer pattern."""

    def __init__(self, api_key, queue_size=100, num_consumers=5):
        self.api_key = api_key
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.num_consumers = num_consumers
        self.solved_count = 0
        self.error_count = 0
        self.running = True

    async def produce(self, tasks):
        """Producer: feed CAPTCHA tasks into the queue."""
        for task in tasks:
            await self.queue.put(task)
        # Signal consumers to stop
        for _ in range(self.num_consumers):
            await self.queue.put(None)

    async def consume(self, result_handler):
        """Consumer: solve CAPTCHAs and call result handler."""
        async with aiohttp.ClientSession() as session:
            while True:
                task = await self.queue.get()
                if task is None:
                    self.queue.task_done()
                    break

                try:
                    result = await self._solve(session, task["method"], **task["params"])
                    self.solved_count += 1
                    if result_handler:
                        await result_handler(task, result)
                except Exception as e:
                    self.error_count += 1
                    print(f"Error: {e}")
                finally:
                    self.queue.task_done()

    async def run(self, tasks, result_handler=None):
        """Run the producer-consumer pipeline."""
        # Start producer
        producer = asyncio.create_task(self.produce(tasks))

        # Start consumers
        consumers = [
            asyncio.create_task(self.consume(result_handler))
            for _ in range(self.num_consumers)
        ]

        # Wait for everything to finish
        await producer
        await asyncio.gather(*consumers)

        print(f"Complete: {self.solved_count} solved, {self.error_count} errors")

    async def _solve(self, session, method, **params):
        async with session.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }) as resp:
            data = await resp.json(content_type=None)
            if data.get("status") != 1:
                raise Exception(f"Submit: {data.get('request')}")
            task_id = data["request"]

        for _ in range(30):
            await asyncio.sleep(5)
            async with session.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }) as resp:
                result = await resp.json(content_type=None)
                if result.get("status") == 1:
                    return result["request"]
        raise TimeoutError("Timed out")


# Usage
async def handle_result(task, token):
    url = task["params"]["pageurl"]
    print(f"Solved for {url}: {token[:30]}...")


async def main():
    queue = ProducerConsumerQueue(API_KEY, num_consumers=5)

    tasks = [
        {"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
        for i in range(20)
    ]

    await queue.run(tasks, result_handler=handle_result)


asyncio.run(main())

Priority queue

When some CAPTCHAs are more important than others:

import asyncio
from dataclasses import dataclass, field

API_KEY = "YOUR_API_KEY"


@dataclass(order=True)
class PriorityTask:
    priority: int
    task: dict = field(compare=False)


class PriorityCaptchaQueue:
    """CAPTCHA queue with priority levels."""

    def __init__(self, api_key, num_workers=5):
        self.api_key = api_key
        self.queue = asyncio.PriorityQueue()
        self.num_workers = num_workers
        self.results = {}

    async def submit(self, task_id, method, priority=5, **params):
        """Submit with priority (lower number = higher priority)."""
        await self.queue.put(PriorityTask(
            priority=priority,
            task={"id": task_id, "method": method, "params": params},
        ))

    async def process(self):
        """Process all queued tasks by priority."""
        workers = [asyncio.create_task(self._worker()) for _ in range(self.num_workers)]

        # Wait for queue to drain
        await self.queue.join()

        # Cancel workers
        for w in workers:
            w.cancel()

        return self.results

    async def _worker(self):
        import aiohttp
        async with aiohttp.ClientSession() as session:
            while True:
                item = await self.queue.get()
                task = item.task
                try:
                    result = await self._solve(session, task["method"], **task["params"])
                    self.results[task["id"]] = {"status": "solved", "token": result}
                except Exception as e:
                    self.results[task["id"]] = {"status": "error", "error": str(e)}
                finally:
                    self.queue.task_done()

    async def _solve(self, session, method, **params):
        import aiohttp
        async with session.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }) as resp:
            data = await resp.json(content_type=None)
            if data.get("status") != 1:
                raise Exception(data.get("request"))
            task_id = data["request"]

        for _ in range(30):
            await asyncio.sleep(5)
            async with session.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }) as resp:
                result = await resp.json(content_type=None)
                if result.get("status") == 1:
                    return result["request"]
        raise TimeoutError()


# Usage
async def main():
    pq = PriorityCaptchaQueue(API_KEY, num_workers=3)

    # High priority — checkout pages
    await pq.submit("checkout_1", "turnstile", priority=1, sitekey="KEY", pageurl="https://shop.com/checkout")

    # Normal priority — product pages
    for i in range(5):
        await pq.submit(f"product_{i}", "userrecaptcha", priority=5, googlekey="KEY", pageurl=f"https://shop.com/p/{i}")

    # Low priority — info pages
    for i in range(3):
        await pq.submit(f"info_{i}", "userrecaptcha", priority=10, googlekey="KEY", pageurl=f"https://shop.com/info/{i}")

    results = await pq.process()
    for task_id, result in results.items():
        print(f"{task_id}: {result['status']}")


asyncio.run(main())

Monitoring and reporting

import time
from dataclasses import dataclass, field


@dataclass
class QueueMetrics:
    submitted: int = 0
    solved: int = 0
    failed: int = 0
    total_solve_time: float = 0.0
    start_time: float = field(default_factory=time.time)

    @property
    def avg_solve_time(self):
        return self.total_solve_time / self.solved if self.solved else 0

    @property
    def success_rate(self):
        total = self.solved + self.failed
        return (self.solved / total * 100) if total else 0

    @property
    def throughput(self):
        elapsed = time.time() - self.start_time
        return self.solved / elapsed * 60 if elapsed > 0 else 0

    def report(self):
        return (
            f"Submitted: {self.submitted} | "
            f"Solved: {self.solved} | "
            f"Failed: {self.failed} | "
            f"Avg time: {self.avg_solve_time:.1f}s | "
            f"Success: {self.success_rate:.1f}% | "
            f"Throughput: {self.throughput:.0f}/min"
        )

Troubleshooting

Symptom Cause Fix
Queue grows but tasks don't complete Too many workers overwhelm API Reduce max_workers / max_concurrent
ERROR_NO_SLOT_AVAILABLE API concurrency limit hit Add delay between submissions
Tasks stuck in queue Worker threads died on exception Wrap worker loop in try/except
Memory grows over time Results not consumed Call get_results() periodically
Async queue blocks Missing await Ensure all async calls are awaited

Frequently asked questions

How many concurrent solves can I run?

CaptchaAI handles concurrency on the server side. Start with 10 concurrent workers and increase based on your plan limits. Check ERROR_NO_SLOT_AVAILABLE to know when to throttle.

Should I use threading or asyncio?

Use asyncio for new projects — it handles I/O-bound CAPTCHA solving more efficiently. Use threading if integrating into existing synchronous code.

How do I handle API rate limits?

Use a semaphore (async) or bounded queue (threading) to cap concurrent requests. Add a short delay between submissions if you hit ERROR_NO_SLOT_AVAILABLE.


Summary

A CAPTCHA solving queue decouples submission from polling, enabling parallel solving with CaptchaAI. Choose threading for synchronous code, asyncio for modern Python, and producer-consumer for continuous workloads.

Discussions (0)

No comments yet.

Related Posts

DevOps & Scaling Blue-Green Deployment for CAPTCHA Solving Infrastructure
Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switching, and rollback strategies with Captcha AI.

Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switchin...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Ansible Playbooks for CaptchaAI Worker Deployment
Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates, and health checks across your server fleet.

Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Horizontal Scaling CAPTCHA Solving Workers: When and How
Scale CAPTCHA solving horizontally — identify bottlenecks, add workers dynamically, auto-scale based on queue depth, and manage costs with Captcha AI.

Scale CAPTCHA solving horizontally — identify bottlenecks, add workers dynamically, auto-scale based on queue...

Automation Python All CAPTCHA Types
Mar 07, 2026
DevOps & Scaling AWS Lambda + CaptchaAI: Serverless CAPTCHA Solving
Integrate Captcha AI with AWS Lambda for serverless CAPTCHA solving.

Integrate Captcha AI with AWS Lambda for serverless CAPTCHA solving. Deploy functions, manage API keys with Se...

Automation Python All CAPTCHA Types
Feb 17, 2026
Tutorials Health Check Endpoints for CAPTCHA Solving Workers
Build health check endpoints for CAPTCHA solving workers — expose liveness, readiness, and dependency checks so orchestrators can detect and replace unhealthy i...

Build health check endpoints for CAPTCHA solving workers — expose liveness, readiness, and dependency checks s...

Automation Python All CAPTCHA Types
Jan 27, 2026
DevOps & Scaling Building Event-Driven CAPTCHA Solving with AWS SNS and CaptchaAI
Build an event-driven CAPTCHA solving pipeline using AWS SNS for fan-out notifications and Captcha AI — decouple task submission from result processing.

Build an event-driven CAPTCHA solving pipeline using AWS SNS for fan-out notifications and Captcha AI — decoup...

Automation Python All CAPTCHA Types
Jan 21, 2026
Tutorials CAPTCHA Session State Management Across Distributed Workers
Manage CAPTCHA session state across distributed workers — cookies, tokens, and browser context between machines using Redis, shared storage, and session seriali...

Manage CAPTCHA session state across distributed workers — cookies, tokens, and browser context between machine...

Automation Python All CAPTCHA Types
Mar 29, 2026
DevOps & Scaling High Availability CAPTCHA Solving: Failover and Redundancy
Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and graceful degradation with Captcha AI.

Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and g...

Automation Python All CAPTCHA Types
Mar 27, 2026
DevOps & Scaling Auto-Scaling CAPTCHA Solving Workers
Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates.

Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates...

Automation Python All CAPTCHA Types
Mar 23, 2026
Tutorials Queue-Based Batch CAPTCHA Processing with Priority Levels
How to build a priority queue system for CAPTCHA solving — assign priority levels to tasks, process high-priority requests first, manage concurrency limits, and...

How to build a priority queue system for CAPTCHA solving — assign priority levels to tasks, process high-prior...

Automation Python All CAPTCHA Types
Mar 20, 2026
Tutorials Extracting reCAPTCHA Parameters from Page Source
Extract re CAPTCHA parameters from any web page — sitekey, action, data-s, enterprise flag, and version — using regex, DOM queries, and network interception.

Extract all re CAPTCHA parameters from any web page — sitekey, action, data-s, enterprise flag, and version —...

Python reCAPTCHA v2 Web Scraping
Apr 07, 2026
Tutorials Handling Multiple CAPTCHAs on a Single Page
how to detect and solve multiple CAPTCHAs on a single web page using Captcha AI.

Learn how to detect and solve multiple CAPTCHAs on a single web page using Captcha AI. Covers multi-iframe ext...

Python reCAPTCHA v2 Cloudflare Turnstile
Apr 09, 2026