Patterns for solving 1,000 to 100,000+ CAPTCHAs per hour reliably.
Pattern 1: Simple Worker Pool
Best for: 100–1,000 solves/hour.
┌──────────┐ ┌──────────────┐ ┌────────────┐
│ Scraper │────▶│ Thread Pool │────▶│ CaptchaAI │
│ Tasks │ │ (5-20 │ │ API │
│ │◀────│ workers) │◀────│ │
└──────────┘ └──────────────┘ └────────────┘
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests
class SimpleWorkerPool:
def __init__(self, api_key, max_workers=10):
self.api_key = api_key
self.max_workers = max_workers
self.base = "https://ocr.captchaai.com"
def _solve_one(self, params):
params["key"] = self.api_key
params["json"] = 1
resp = requests.post(f"{self.base}/in.php", data=params).json()
if resp["status"] != 1:
return {"error": resp["request"]}
task_id = resp["request"]
time.sleep(10)
for _ in range(60):
result = requests.get(
f"{self.base}/res.php",
params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
).json()
if result["request"] == "CAPCHA_NOT_READY":
time.sleep(5)
continue
if result["status"] == 1:
return {"token": result["request"]}
return {"error": result["request"]}
return {"error": "timeout"}
def solve_batch(self, tasks):
"""tasks: list of (identifier, params) tuples."""
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
futures = {
pool.submit(self._solve_one, params): ident
for ident, params in tasks
}
for future in as_completed(futures):
ident = futures[future]
try:
results[ident] = future.result()
except Exception as e:
results[ident] = {"error": str(e)}
return results
Pattern 2: Queue-Based Pipeline
Best for: 1,000–10,000 solves/hour with back-pressure control.
┌────────┐ ┌───────────┐ ┌──────────┐ ┌───────────┐ ┌────────┐
│Producer│────▶│ Submit │────▶│ Pending │────▶│ Poll │────▶│Results │
│ │ │ Queue │ │ Queue │ │ Workers │ │ Queue │
└────────┘ └───────────┘ └──────────┘ └───────────┘ └────────┘
import queue
import threading
import time
import requests
class QueuePipeline:
def __init__(self, api_key, submit_workers=5, poll_workers=10):
self.api_key = api_key
self.base = "https://ocr.captchaai.com"
self.submit_queue = queue.Queue(maxsize=100)
self.pending_queue = queue.Queue()
self.results = {}
self.results_lock = threading.Lock()
self._running = False
self.submit_workers = submit_workers
self.poll_workers = poll_workers
def start(self):
self._running = True
for _ in range(self.submit_workers):
threading.Thread(target=self._submit_worker, daemon=True).start()
for _ in range(self.poll_workers):
threading.Thread(target=self._poll_worker, daemon=True).start()
def stop(self):
self._running = False
def add(self, ident, params):
self.submit_queue.put((ident, params))
def get_result(self, ident, timeout=300):
deadline = time.time() + timeout
while time.time() < deadline:
with self.results_lock:
if ident in self.results:
return self.results.pop(ident)
time.sleep(1)
return {"error": "timeout"}
def _submit_worker(self):
while self._running:
try:
ident, params = self.submit_queue.get(timeout=1)
except queue.Empty:
continue
params["key"] = self.api_key
params["json"] = 1
try:
resp = requests.post(f"{self.base}/in.php", data=params).json()
if resp["status"] == 1:
self.pending_queue.put((ident, resp["request"], time.time()))
else:
with self.results_lock:
self.results[ident] = {"error": resp["request"]}
except Exception as e:
with self.results_lock:
self.results[ident] = {"error": str(e)}
def _poll_worker(self):
while self._running:
try:
ident, task_id, submitted_at = self.pending_queue.get(timeout=1)
except queue.Empty:
continue
# Wait at least 10s from submission
wait = 10 - (time.time() - submitted_at)
if wait > 0:
time.sleep(wait)
try:
resp = requests.get(
f"{self.base}/res.php",
params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
).json()
if resp["request"] == "CAPCHA_NOT_READY":
self.pending_queue.put((ident, task_id, submitted_at))
time.sleep(3)
elif resp["status"] == 1:
with self.results_lock:
self.results[ident] = {"token": resp["request"]}
else:
with self.results_lock:
self.results[ident] = {"error": resp["request"]}
except Exception:
self.pending_queue.put((ident, task_id, submitted_at))
time.sleep(5)
Usage:
pipeline = QueuePipeline("YOUR_API_KEY")
pipeline.start()
# Add CAPTCHAs
pipeline.add("page_1", {"method": "turnstile", "sitekey": "KEY", "pageurl": "URL"})
pipeline.add("page_2", {"method": "userrecaptcha", "googlekey": "KEY", "pageurl": "URL"})
# Get results
result1 = pipeline.get_result("page_1")
result2 = pipeline.get_result("page_2")
pipeline.stop()
Pattern 3: Circuit Breaker
Prevents cascading failures when the solving API is degraded.
import time
import threading
class CircuitBreaker:
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing — reject requests
HALF_OPEN = "half_open" # Testing recovery
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.state = self.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.lock = threading.Lock()
def can_proceed(self):
with self.lock:
if self.state == self.CLOSED:
return True
if self.state == self.OPEN:
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = self.HALF_OPEN
return True
return False
# HALF_OPEN: allow one request
return True
def record_success(self):
with self.lock:
self.failure_count = 0
self.state = self.CLOSED
def record_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = self.OPEN
class ResilientSolver:
def __init__(self, api_key):
self.api_key = api_key
self.breaker = CircuitBreaker(failure_threshold=5, reset_timeout=60)
def solve(self, params):
if not self.breaker.can_proceed():
raise Exception("Circuit open — API degraded, try later")
try:
result = self._do_solve(params)
self.breaker.record_success()
return result
except Exception as e:
self.breaker.record_failure()
raise
def _do_solve(self, params):
# Standard solve logic
pass
Pattern 4: Pre-Solving with Token Buffer
Maintain a buffer of pre-solved tokens for instant use.
import queue
import threading
import time
class TokenBuffer:
def __init__(self, solver, params, buffer_size=5, ttl_seconds=90):
self.solver = solver
self.params = params
self.buffer = queue.Queue(maxsize=buffer_size)
self.ttl = ttl_seconds
self.buffer_size = buffer_size
self._running = False
def start(self):
self._running = True
threading.Thread(target=self._fill_loop, daemon=True).start()
def stop(self):
self._running = False
def get_token(self, timeout=30):
"""Get a pre-solved token. Returns None if buffer empty."""
try:
token, created_at = self.buffer.get(timeout=timeout)
if time.time() - created_at > self.ttl:
# Token expired, try next
return self.get_token(timeout=timeout)
return token
except queue.Empty:
return None
def _fill_loop(self):
while self._running:
if self.buffer.qsize() < self.buffer_size:
try:
token = self.solver.solve(self.params)
self.buffer.put((token, time.time()))
except Exception:
time.sleep(5)
else:
time.sleep(2)
Pattern 5: Multi-Provider Failover
Route to backup providers when primary fails.
class MultiProviderSolver:
def __init__(self, providers):
"""providers: list of (name, solver_instance, priority) tuples."""
self.providers = sorted(providers, key=lambda x: x[2])
self.breakers = {name: CircuitBreaker() for name, _, _ in providers}
def solve(self, params):
errors = []
for name, solver, _ in self.providers:
if not self.breakers[name].can_proceed():
continue
try:
result = solver.solve(params)
self.breakers[name].record_success()
return result
except Exception as e:
self.breakers[name].record_failure()
errors.append(f"{name}: {e}")
raise Exception(f"All providers failed: {'; '.join(errors)}")
Scaling Guidelines
| Volume | Architecture | Workers | Notes |
|---|---|---|---|
| < 100/hr | Direct calls | 1-3 | No special architecture needed |
| 100-1K/hr | Worker pool | 5-10 | Pattern 1 |
| 1K-10K/hr | Queue pipeline | 10-30 | Pattern 2 + circuit breaker |
| 10K-50K/hr | Distributed queues | 30-100 | Redis/RabbitMQ, multiple machines |
| 50K+/hr | Multi-provider | 100+ | Pattern 5 + distributed queue |
Monitoring at Scale
import logging
from collections import defaultdict
logger = logging.getLogger("captcha_scale")
class ScaleMetrics:
def __init__(self):
self.counts = defaultdict(int)
self.times = defaultdict(list)
def record(self, captcha_type, success, elapsed):
key = f"{captcha_type}_{'ok' if success else 'fail'}"
self.counts[key] += 1
self.times[captcha_type].append(elapsed)
def report(self):
for captcha_type in set(k.rsplit("_", 1)[0] for k in self.counts):
ok = self.counts.get(f"{captcha_type}_ok", 0)
fail = self.counts.get(f"{captcha_type}_fail", 0)
total = ok + fail
rate = (ok / total * 100) if total else 0
times = self.times.get(captcha_type, [])
avg_time = sum(times) / len(times) if times else 0
logger.info(
f"{captcha_type}: {total} solves, {rate:.1f}% success, {avg_time:.1f}s avg"
)
FAQ
How many concurrent solves can CaptchaAI handle?
CaptchaAI can handle high concurrency. Start with 10-20 concurrent requests and increase based on your results.
Should I use async or threads?
Threads work well since CAPTCHA solving is I/O-bound (network requests + waiting). For Python 3.7+, asyncio with aiohttp is also excellent for very high concurrency.
How do I prevent overwhelming the API?
Use a queue with bounded size (back-pressure), rate limiting, and circuit breakers. Monitor your ERROR_NO_SLOT_AVAILABLE rate.
Related Guides
Scale to any volume — start with CaptchaAI.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.