When multiple workflows submit CAPTCHA tasks simultaneously, a flat FIFO queue treats all tasks equally. A priority queue ensures time-sensitive tasks — like user-facing form submissions — get solved before background batch jobs.
Priority Levels
| Priority | Use Case | Target Solve Time |
|---|---|---|
critical (0) |
User waiting in real-time (checkout, login) | < 30 seconds |
high (1) |
Time-sensitive automation (price monitoring) | < 60 seconds |
normal (2) |
Standard batch processing | < 120 seconds |
low (3) |
Background jobs, non-urgent scraping | Best effort |
Lower number = higher priority.
Python Priority Queue
import time
import heapq
import threading
import requests
from dataclasses import dataclass, field
from typing import Any
API_KEY = "YOUR_API_KEY"
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"
@dataclass(order=True)
class CaptchaTask:
priority: int
created_at: float = field(compare=False)
task_data: dict = field(compare=False)
callback: Any = field(default=None, compare=False, repr=False)
sequence: int = field(default=0) # Tie-breaker for same-priority tasks
class PriorityQueueSolver:
def __init__(self, max_workers=10):
self.queue = []
self.lock = threading.Lock()
self.max_workers = max_workers
self.active_workers = 0
self.sequence = 0
self.running = True
self.stats = {"submitted": 0, "solved": 0, "failed": 0}
# Start dispatcher thread
self.dispatcher = threading.Thread(target=self._dispatch_loop, daemon=True)
self.dispatcher.start()
def submit(self, task_data, priority=2, callback=None):
"""Add a CAPTCHA task to the priority queue."""
with self.lock:
self.sequence += 1
task = CaptchaTask(
priority=priority,
created_at=time.monotonic(),
task_data=task_data,
callback=callback,
sequence=self.sequence,
)
heapq.heappush(self.queue, task)
self.stats["submitted"] += 1
return task
def _dispatch_loop(self):
"""Continuously pull tasks from the queue and process them."""
while self.running:
with self.lock:
if self.active_workers >= self.max_workers or not self.queue:
pass
else:
task = heapq.heappop(self.queue)
self.active_workers += 1
threading.Thread(
target=self._process_task, args=(task,), daemon=True
).start()
time.sleep(0.1)
def _process_task(self, task):
"""Submit and poll a single CAPTCHA task."""
try:
data = task.task_data
params = {"key": API_KEY, "json": 1}
captcha_type = data.get("type", "recaptcha_v2")
if captcha_type == "recaptcha_v2":
params.update({
"method": "userrecaptcha",
"googlekey": data["sitekey"],
"pageurl": data["pageurl"],
})
elif captcha_type == "turnstile":
params.update({
"method": "turnstile",
"sitekey": data["sitekey"],
"pageurl": data["pageurl"],
})
elif captcha_type == "hcaptcha":
params.update({
"method": "hcaptcha",
"sitekey": data["sitekey"],
"pageurl": data["pageurl"],
})
# Submit
response = requests.post(SUBMIT_URL, data=params, timeout=30)
result = response.json()
if result.get("status") != 1:
self._complete(task, None, result.get("request", "submit failed"))
return
task_id = result["request"]
# Poll
for _ in range(60):
time.sleep(5)
poll = requests.get(RESULT_URL, params={
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}, timeout=15).json()
if poll.get("request") == "CAPCHA_NOT_READY":
continue
if poll.get("status") == 1:
self._complete(task, poll["request"], None)
return
self._complete(task, None, poll.get("request", "poll error"))
return
self._complete(task, None, "TIMEOUT")
except Exception as e:
self._complete(task, None, str(e))
finally:
with self.lock:
self.active_workers -= 1
def _complete(self, task, token, error):
"""Handle task completion."""
with self.lock:
if token:
self.stats["solved"] += 1
else:
self.stats["failed"] += 1
wait_time = time.monotonic() - task.created_at
priority_name = ["critical", "high", "normal", "low"][task.priority]
if token:
print(f" [{priority_name}] Solved in {wait_time:.1f}s (token={len(token)} chars)")
else:
print(f" [{priority_name}] Failed: {error} ({wait_time:.1f}s)")
if task.callback:
task.callback(token, error)
def queue_size(self):
return len(self.queue)
def get_stats(self):
return {**self.stats, "queue_size": len(self.queue), "active": self.active_workers}
def shutdown(self, wait=True):
"""Stop accepting new tasks and optionally wait for active ones."""
self.running = False
if wait:
while self.active_workers > 0:
time.sleep(0.5)
# Usage
solver = PriorityQueueSolver(max_workers=10)
# Critical: user-facing checkout
solver.submit(
{"type": "recaptcha_v2", "sitekey": "SITE_KEY", "pageurl": "https://checkout.example.com"},
priority=0,
callback=lambda token, err: print(f"Checkout: {'OK' if token else err}"),
)
# Normal: batch processing
for i in range(5):
solver.submit(
{"type": "recaptcha_v2", "sitekey": "SITE_KEY", "pageurl": "https://example.com/page"},
priority=2,
)
# Low: background job
solver.submit(
{"type": "turnstile", "sitekey": "SITE_KEY", "pageurl": "https://example.com"},
priority=3,
)
# Wait for all tasks
time.sleep(120)
print(solver.get_stats())
solver.shutdown()
JavaScript Priority Queue (Node.js)
const API_KEY = "YOUR_API_KEY";
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";
class PriorityQueueSolver {
constructor(maxWorkers = 10) {
this.queue = []; // [{ priority, sequence, taskData, resolve, reject }]
this.maxWorkers = maxWorkers;
this.activeWorkers = 0;
this.sequence = 0;
this.stats = { submitted: 0, solved: 0, failed: 0 };
this.running = true;
this._dispatchLoop();
}
solve(taskData, priority = 2) {
return new Promise((resolve, reject) => {
this.queue.push({
priority,
sequence: this.sequence++,
taskData,
resolve,
reject,
createdAt: Date.now(),
});
// Keep sorted: lower priority number = first
this.queue.sort((a, b) => a.priority - b.priority || a.sequence - b.sequence);
this.stats.submitted++;
});
}
async _dispatchLoop() {
while (this.running) {
if (this.activeWorkers < this.maxWorkers && this.queue.length > 0) {
const task = this.queue.shift();
this.activeWorkers++;
this._processTask(task).finally(() => this.activeWorkers--);
}
await new Promise((r) => setTimeout(r, 100));
}
}
async _processTask(task) {
const priorityNames = ["critical", "high", "normal", "low"];
const pName = priorityNames[task.priority] || "unknown";
try {
const params = { key: API_KEY, json: 1 };
const type = task.taskData.type || "recaptcha_v2";
if (type === "recaptcha_v2") {
Object.assign(params, {
method: "userrecaptcha",
googlekey: task.taskData.sitekey,
pageurl: task.taskData.pageurl,
});
} else if (type === "turnstile") {
Object.assign(params, {
method: "turnstile",
sitekey: task.taskData.sitekey,
pageurl: task.taskData.pageurl,
});
}
// Submit
const response = await fetch(SUBMIT_URL, {
method: "POST",
body: new URLSearchParams(params),
});
const result = await response.json();
if (result.status !== 1) throw new Error(result.request || "Submit failed");
const taskId = result.request;
// Poll
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 5000));
const url = new URL(RESULT_URL);
url.searchParams.set("key", API_KEY);
url.searchParams.set("action", "get");
url.searchParams.set("id", taskId);
url.searchParams.set("json", "1");
const poll = await (await fetch(url)).json();
if (poll.request === "CAPCHA_NOT_READY") continue;
if (poll.status === 1) {
const elapsed = ((Date.now() - task.createdAt) / 1000).toFixed(1);
console.log(` [${pName}] Solved in ${elapsed}s`);
this.stats.solved++;
task.resolve(poll.request);
return;
}
throw new Error(poll.request || "Poll failed");
}
throw new Error("TIMEOUT");
} catch (err) {
const elapsed = ((Date.now() - task.createdAt) / 1000).toFixed(1);
console.log(` [${pName}] Failed: ${err.message} (${elapsed}s)`);
this.stats.failed++;
task.reject(err);
}
}
getStats() {
return { ...this.stats, queueSize: this.queue.length, active: this.activeWorkers };
}
shutdown() {
this.running = false;
}
}
// Usage
const solver = new PriorityQueueSolver(10);
// Critical priority
solver.solve(
{ type: "recaptcha_v2", sitekey: "SITE_KEY", pageurl: "https://checkout.example.com" },
0 // critical
).then((token) => console.log("Checkout token ready"));
// Normal priority batch
for (let i = 0; i < 5; i++) {
solver.solve(
{ type: "recaptcha_v2", sitekey: "SITE_KEY", pageurl: "https://example.com" },
2 // normal
).catch((err) => console.error(`Batch ${i} failed: ${err.message}`));
}
Concurrency Allocation by Priority
Reserve worker slots for high-priority tasks to prevent starvation:
| Priority | Reserved Workers (out of 10) | Shared Workers |
|---|---|---|
| Critical | 2 (always available) | Can use any idle worker |
| High | 2 | Can use any non-reserved worker |
| Normal | 0 | Uses remaining idle workers |
| Low | 0 | Only runs when no other tasks waiting |
Backpressure Handling
When the queue grows too large:
| Queue Size | Action |
|---|---|
| < 100 | Normal operation |
| 100–500 | Log warning; reject low priority tasks |
| 500–1000 | Reject low and normal; only accept high/critical |
| > 1000 | Reject all new tasks; drain existing queue |
def submit(self, task_data, priority=2, callback=None):
queue_size = len(self.queue)
if queue_size > 1000:
raise QueueFullError("Queue capacity exceeded")
if queue_size > 500 and priority > 1:
raise QueueFullError(f"Only high/critical priority accepted (queue={queue_size})")
if queue_size > 100 and priority > 2:
raise QueueFullError(f"Low priority rejected (queue={queue_size})")
# ... normal submit logic
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Low-priority tasks never execute | High-priority tasks constantly arriving | Reserve a minimum slot for low priority; add starvation timeout |
| Critical tasks wait behind normal tasks | Priority not being respected | Verify heap ordering; check that dispatcher pops highest priority first |
| Queue grows unbounded | Submission rate exceeds solve rate | Add backpressure; increase max_workers; reduce submission rate |
| Tasks solved but callback not called | Exception in callback function | Wrap callbacks in try/catch |
| All tasks timing out | API key issue or network problem | Check balance; verify API connectivity |
FAQ
What happens if a critical task fails?
Implement retry logic for critical tasks — automatically resubmit with the same priority. Normal and low tasks can fail without retry to preserve capacity for critical work.
Can I change a task's priority after submission?
With a heap-based queue, you'd need to remove and re-insert the task. In practice, it's simpler to cancel the original and submit a new task at the desired priority.
How do I prevent priority starvation of low-priority tasks?
Add an age-based boost: increase a task's effective priority by 1 level for every 60 seconds it waits. A low task waiting 3 minutes becomes critical priority, ensuring it eventually runs.
Related Articles
Next Steps
Build priority-based CAPTCHA processing into your system — get your CaptchaAI API key and start queuing tasks by importance.
Related guides:
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.