Tutorials

Queue-Based Batch CAPTCHA Processing with Priority Levels

When multiple workflows submit CAPTCHA tasks simultaneously, a flat FIFO queue treats all tasks equally. A priority queue ensures time-sensitive tasks — like user-facing form submissions — get solved before background batch jobs.

Priority Levels

Priority Use Case Target Solve Time
critical (0) User waiting in real-time (checkout, login) < 30 seconds
high (1) Time-sensitive automation (price monitoring) < 60 seconds
normal (2) Standard batch processing < 120 seconds
low (3) Background jobs, non-urgent scraping Best effort

Lower number = higher priority.

Python Priority Queue

import time
import heapq
import threading
import requests
from dataclasses import dataclass, field
from typing import Any

API_KEY = "YOUR_API_KEY"
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"


@dataclass(order=True)
class CaptchaTask:
    priority: int
    created_at: float = field(compare=False)
    task_data: dict = field(compare=False)
    callback: Any = field(default=None, compare=False, repr=False)
    sequence: int = field(default=0)  # Tie-breaker for same-priority tasks


class PriorityQueueSolver:
    def __init__(self, max_workers=10):
        self.queue = []
        self.lock = threading.Lock()
        self.max_workers = max_workers
        self.active_workers = 0
        self.sequence = 0
        self.running = True
        self.stats = {"submitted": 0, "solved": 0, "failed": 0}

        # Start dispatcher thread
        self.dispatcher = threading.Thread(target=self._dispatch_loop, daemon=True)
        self.dispatcher.start()

    def submit(self, task_data, priority=2, callback=None):
        """Add a CAPTCHA task to the priority queue."""
        with self.lock:
            self.sequence += 1
            task = CaptchaTask(
                priority=priority,
                created_at=time.monotonic(),
                task_data=task_data,
                callback=callback,
                sequence=self.sequence,
            )
            heapq.heappush(self.queue, task)
            self.stats["submitted"] += 1
        return task

    def _dispatch_loop(self):
        """Continuously pull tasks from the queue and process them."""
        while self.running:
            with self.lock:
                if self.active_workers >= self.max_workers or not self.queue:
                    pass
                else:
                    task = heapq.heappop(self.queue)
                    self.active_workers += 1
                    threading.Thread(
                        target=self._process_task, args=(task,), daemon=True
                    ).start()
            time.sleep(0.1)

    def _process_task(self, task):
        """Submit and poll a single CAPTCHA task."""
        try:
            data = task.task_data
            params = {"key": API_KEY, "json": 1}

            captcha_type = data.get("type", "recaptcha_v2")
            if captcha_type == "recaptcha_v2":
                params.update({
                    "method": "userrecaptcha",
                    "googlekey": data["sitekey"],
                    "pageurl": data["pageurl"],
                })
            elif captcha_type == "turnstile":
                params.update({
                    "method": "turnstile",
                    "sitekey": data["sitekey"],
                    "pageurl": data["pageurl"],
                })
            elif captcha_type == "hcaptcha":
                params.update({
                    "method": "hcaptcha",
                    "sitekey": data["sitekey"],
                    "pageurl": data["pageurl"],
                })

            # Submit
            response = requests.post(SUBMIT_URL, data=params, timeout=30)
            result = response.json()

            if result.get("status") != 1:
                self._complete(task, None, result.get("request", "submit failed"))
                return

            task_id = result["request"]

            # Poll
            for _ in range(60):
                time.sleep(5)
                poll = requests.get(RESULT_URL, params={
                    "key": API_KEY, "action": "get",
                    "id": task_id, "json": 1,
                }, timeout=15).json()

                if poll.get("request") == "CAPCHA_NOT_READY":
                    continue
                if poll.get("status") == 1:
                    self._complete(task, poll["request"], None)
                    return

                self._complete(task, None, poll.get("request", "poll error"))
                return

            self._complete(task, None, "TIMEOUT")

        except Exception as e:
            self._complete(task, None, str(e))
        finally:
            with self.lock:
                self.active_workers -= 1

    def _complete(self, task, token, error):
        """Handle task completion."""
        with self.lock:
            if token:
                self.stats["solved"] += 1
            else:
                self.stats["failed"] += 1

        wait_time = time.monotonic() - task.created_at
        priority_name = ["critical", "high", "normal", "low"][task.priority]

        if token:
            print(f"  [{priority_name}] Solved in {wait_time:.1f}s (token={len(token)} chars)")
        else:
            print(f"  [{priority_name}] Failed: {error} ({wait_time:.1f}s)")

        if task.callback:
            task.callback(token, error)

    def queue_size(self):
        return len(self.queue)

    def get_stats(self):
        return {**self.stats, "queue_size": len(self.queue), "active": self.active_workers}

    def shutdown(self, wait=True):
        """Stop accepting new tasks and optionally wait for active ones."""
        self.running = False
        if wait:
            while self.active_workers > 0:
                time.sleep(0.5)


# Usage
solver = PriorityQueueSolver(max_workers=10)

# Critical: user-facing checkout
solver.submit(
    {"type": "recaptcha_v2", "sitekey": "SITE_KEY", "pageurl": "https://checkout.example.com"},
    priority=0,
    callback=lambda token, err: print(f"Checkout: {'OK' if token else err}"),
)

# Normal: batch processing
for i in range(5):
    solver.submit(
        {"type": "recaptcha_v2", "sitekey": "SITE_KEY", "pageurl": "https://example.com/page"},
        priority=2,
    )

# Low: background job
solver.submit(
    {"type": "turnstile", "sitekey": "SITE_KEY", "pageurl": "https://example.com"},
    priority=3,
)

# Wait for all tasks
time.sleep(120)
print(solver.get_stats())
solver.shutdown()

JavaScript Priority Queue (Node.js)

const API_KEY = "YOUR_API_KEY";
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";

class PriorityQueueSolver {
  constructor(maxWorkers = 10) {
    this.queue = []; // [{ priority, sequence, taskData, resolve, reject }]
    this.maxWorkers = maxWorkers;
    this.activeWorkers = 0;
    this.sequence = 0;
    this.stats = { submitted: 0, solved: 0, failed: 0 };
    this.running = true;
    this._dispatchLoop();
  }

  solve(taskData, priority = 2) {
    return new Promise((resolve, reject) => {
      this.queue.push({
        priority,
        sequence: this.sequence++,
        taskData,
        resolve,
        reject,
        createdAt: Date.now(),
      });
      // Keep sorted: lower priority number = first
      this.queue.sort((a, b) => a.priority - b.priority || a.sequence - b.sequence);
      this.stats.submitted++;
    });
  }

  async _dispatchLoop() {
    while (this.running) {
      if (this.activeWorkers < this.maxWorkers && this.queue.length > 0) {
        const task = this.queue.shift();
        this.activeWorkers++;
        this._processTask(task).finally(() => this.activeWorkers--);
      }
      await new Promise((r) => setTimeout(r, 100));
    }
  }

  async _processTask(task) {
    const priorityNames = ["critical", "high", "normal", "low"];
    const pName = priorityNames[task.priority] || "unknown";

    try {
      const params = { key: API_KEY, json: 1 };
      const type = task.taskData.type || "recaptcha_v2";

      if (type === "recaptcha_v2") {
        Object.assign(params, {
          method: "userrecaptcha",
          googlekey: task.taskData.sitekey,
          pageurl: task.taskData.pageurl,
        });
      } else if (type === "turnstile") {
        Object.assign(params, {
          method: "turnstile",
          sitekey: task.taskData.sitekey,
          pageurl: task.taskData.pageurl,
        });
      }

      // Submit
      const response = await fetch(SUBMIT_URL, {
        method: "POST",
        body: new URLSearchParams(params),
      });
      const result = await response.json();
      if (result.status !== 1) throw new Error(result.request || "Submit failed");

      const taskId = result.request;

      // Poll
      for (let i = 0; i < 60; i++) {
        await new Promise((r) => setTimeout(r, 5000));
        const url = new URL(RESULT_URL);
        url.searchParams.set("key", API_KEY);
        url.searchParams.set("action", "get");
        url.searchParams.set("id", taskId);
        url.searchParams.set("json", "1");

        const poll = await (await fetch(url)).json();
        if (poll.request === "CAPCHA_NOT_READY") continue;
        if (poll.status === 1) {
          const elapsed = ((Date.now() - task.createdAt) / 1000).toFixed(1);
          console.log(`  [${pName}] Solved in ${elapsed}s`);
          this.stats.solved++;
          task.resolve(poll.request);
          return;
        }
        throw new Error(poll.request || "Poll failed");
      }
      throw new Error("TIMEOUT");
    } catch (err) {
      const elapsed = ((Date.now() - task.createdAt) / 1000).toFixed(1);
      console.log(`  [${pName}] Failed: ${err.message} (${elapsed}s)`);
      this.stats.failed++;
      task.reject(err);
    }
  }

  getStats() {
    return { ...this.stats, queueSize: this.queue.length, active: this.activeWorkers };
  }

  shutdown() {
    this.running = false;
  }
}

// Usage
const solver = new PriorityQueueSolver(10);

// Critical priority
solver.solve(
  { type: "recaptcha_v2", sitekey: "SITE_KEY", pageurl: "https://checkout.example.com" },
  0 // critical
).then((token) => console.log("Checkout token ready"));

// Normal priority batch
for (let i = 0; i < 5; i++) {
  solver.solve(
    { type: "recaptcha_v2", sitekey: "SITE_KEY", pageurl: "https://example.com" },
    2 // normal
  ).catch((err) => console.error(`Batch ${i} failed: ${err.message}`));
}

Concurrency Allocation by Priority

Reserve worker slots for high-priority tasks to prevent starvation:

Priority Reserved Workers (out of 10) Shared Workers
Critical 2 (always available) Can use any idle worker
High 2 Can use any non-reserved worker
Normal 0 Uses remaining idle workers
Low 0 Only runs when no other tasks waiting

Backpressure Handling

When the queue grows too large:

Queue Size Action
< 100 Normal operation
100–500 Log warning; reject low priority tasks
500–1000 Reject low and normal; only accept high/critical
> 1000 Reject all new tasks; drain existing queue
def submit(self, task_data, priority=2, callback=None):
    queue_size = len(self.queue)

    if queue_size > 1000:
        raise QueueFullError("Queue capacity exceeded")
    if queue_size > 500 and priority > 1:
        raise QueueFullError(f"Only high/critical priority accepted (queue={queue_size})")
    if queue_size > 100 and priority > 2:
        raise QueueFullError(f"Low priority rejected (queue={queue_size})")

    # ... normal submit logic

Troubleshooting

Issue Cause Fix
Low-priority tasks never execute High-priority tasks constantly arriving Reserve a minimum slot for low priority; add starvation timeout
Critical tasks wait behind normal tasks Priority not being respected Verify heap ordering; check that dispatcher pops highest priority first
Queue grows unbounded Submission rate exceeds solve rate Add backpressure; increase max_workers; reduce submission rate
Tasks solved but callback not called Exception in callback function Wrap callbacks in try/catch
All tasks timing out API key issue or network problem Check balance; verify API connectivity

FAQ

What happens if a critical task fails?

Implement retry logic for critical tasks — automatically resubmit with the same priority. Normal and low tasks can fail without retry to preserve capacity for critical work.

Can I change a task's priority after submission?

With a heap-based queue, you'd need to remove and re-insert the task. In practice, it's simpler to cancel the original and submit a new task at the desired priority.

How do I prevent priority starvation of low-priority tasks?

Add an age-based boost: increase a task's effective priority by 1 level for every 60 seconds it waits. A low task waiting 3 minutes becomes critical priority, ensuring it eventually runs.

Next Steps

Build priority-based CAPTCHA processing into your system — get your CaptchaAI API key and start queuing tasks by importance.

Related guides:

Discussions (0)

No comments yet.

Related Posts

DevOps & Scaling Ansible Playbooks for CaptchaAI Worker Deployment
Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates, and health checks across your server fleet.

Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Blue-Green Deployment for CAPTCHA Solving Infrastructure
Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switching, and rollback strategies with Captcha AI.

Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switchin...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Auto-Scaling CAPTCHA Solving Workers
Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates.

Build auto-scaling CAPTCHA solving workers that adjust capacity based on queue depth, balance, and solve rates...

Automation Python All CAPTCHA Types
Mar 23, 2026
DevOps & Scaling CaptchaAI Monitoring with Datadog: Metrics and Alerts
Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve rate tracking for CAPTCHA solving pipelines.

Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve...

Automation Python All CAPTCHA Types
Feb 19, 2026
DevOps & Scaling OpenTelemetry Tracing for CAPTCHA Solving Pipelines
Instrument CAPTCHA solving pipelines with Open Telemetry — distributed traces, spans for submit/poll phases, and vendor-neutral observability with Captcha AI.

Instrument CAPTCHA solving pipelines with Open Telemetry — distributed traces, spans for submit/poll phases, a...

Automation Python All CAPTCHA Types
Mar 07, 2026
DevOps & Scaling CaptchaAI Behind a Load Balancer: Architecture Patterns
Architect CAPTCHA solving workers behind a load balancer — routing strategies, health checks, sticky sessions, and scaling patterns with Captcha AI.

Architect CAPTCHA solving workers behind a load balancer — routing strategies, health checks, sticky sessions,...

Automation Python All CAPTCHA Types
Feb 24, 2026
DevOps & Scaling Rolling Updates for CAPTCHA Solving Worker Fleets
Implement rolling updates for CAPTCHA solving worker fleets — zero-downtime upgrades, graceful draining, health-gated progression, and automatic rollback.

Implement rolling updates for CAPTCHA solving worker fleets — zero-downtime upgrades, graceful draining, healt...

Automation Python All CAPTCHA Types
Feb 28, 2026
DevOps & Scaling CaptchaAI Monitoring with New Relic: APM Integration
Integrate Captcha AI with New Relic APM — custom events, transaction tracing, dashboards, and alert policies for CAPTCHA solving performance.

Integrate Captcha AI with New Relic APM — custom events, transaction tracing, dashboards, and alert policies f...

Automation Python All CAPTCHA Types
Jan 31, 2026
DevOps & Scaling High Availability CAPTCHA Solving: Failover and Redundancy
Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and graceful degradation with Captcha AI.

Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and g...

Automation Python All CAPTCHA Types
Mar 27, 2026
Tutorials Pytest Fixtures for CaptchaAI API Testing
Build reusable pytest fixtures to test CAPTCHA-solving workflows with Captcha AI.

Build reusable pytest fixtures to test CAPTCHA-solving workflows with Captcha AI. Covers mocking, live integra...

Automation Python reCAPTCHA v2
Apr 08, 2026
Tutorials Using Fiddler to Inspect CaptchaAI API Traffic
How to use Fiddler Everywhere and Fiddler Classic to capture, inspect, and debug Captcha AI API requests and responses — filters, breakpoints, and replay for tr...

How to use Fiddler Everywhere and Fiddler Classic to capture, inspect, and debug Captcha AI API requests and r...

Automation Python All CAPTCHA Types
Mar 05, 2026