When scraping hundreds or thousands of pages, solving CAPTCHAs one at a time is slow. Submit multiple tasks in parallel, poll them concurrently, and process results as they arrive.
Sequential vs Parallel
Sequential (slow):
Submit #1 → Poll → Result (15s)
Submit #2 → Poll → Result (15s)
Submit #3 → Poll → Result (15s)
Total: ~45s for 3 solves
Parallel (fast):
Submit #1 ─┐
Submit #2 ─┤→ Poll all → Results arrive
Submit #3 ─┘
Total: ~15s for 3 solves
Basic Concurrent Solving
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
def submit_task(method, **params):
"""Submit a single CAPTCHA task."""
data = {"key": API_KEY, "method": method, "json": 1}
data.update(params)
resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(f"Submit error: {result.get('request')}")
return result["request"]
def poll_result(task_id, timeout=120):
"""Poll until result is ready."""
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{BASE_URL}/res.php", params={
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
return data["request"]
raise TimeoutError(f"Task {task_id} timeout")
def solve_one(sitekey, pageurl):
"""Submit and poll a single task."""
task_id = submit_task("userrecaptcha", googlekey=sitekey, pageurl=pageurl)
token = poll_result(task_id)
return {"url": pageurl, "token": token}
def batch_solve(tasks, max_workers=10):
"""Solve multiple CAPTCHAs in parallel."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(solve_one, t["sitekey"], t["url"]): t
for t in tasks
}
for future in as_completed(futures):
task = futures[future]
try:
result = future.result()
results.append(result)
print(f"Solved: {result['url']}")
except Exception as e:
print(f"Failed: {task['url']} - {e}")
results.append({"url": task["url"], "token": None, "error": str(e)})
return results
# Usage
tasks = [
{"sitekey": "SITE_KEY_1", "url": "https://example.com/page1"},
{"sitekey": "SITE_KEY_2", "url": "https://example.com/page2"},
{"sitekey": "SITE_KEY_3", "url": "https://example.com/page3"},
]
results = batch_solve(tasks, max_workers=5)
print(f"Solved {sum(1 for r in results if r.get('token'))}/{len(tasks)}")
Async Batch Solver
For higher concurrency, use asyncio:
import asyncio
import aiohttp
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
async def submit_task_async(session, method, **params):
data = {"key": API_KEY, "method": method, "json": 1}
data.update(params)
async with session.post(f"{BASE_URL}/in.php", data=data) as resp:
result = await resp.json()
if result.get("status") != 1:
raise RuntimeError(f"Submit error: {result.get('request')}")
return result["request"]
async def poll_result_async(session, task_id, timeout=120):
start = time.time()
while time.time() - start < timeout:
await asyncio.sleep(5)
params = {
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}
async with session.get(f"{BASE_URL}/res.php", params=params) as resp:
data = await resp.json()
if data["request"] != "CAPCHA_NOT_READY":
return data["request"]
raise TimeoutError(f"Task {task_id} timeout")
async def solve_one_async(session, sitekey, pageurl):
task_id = await submit_task_async(
session, "userrecaptcha",
googlekey=sitekey, pageurl=pageurl,
)
token = await poll_result_async(session, task_id)
return {"url": pageurl, "token": token}
async def batch_solve_async(tasks, max_concurrent=20):
"""Solve many CAPTCHAs concurrently with asyncio."""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def solve_with_limit(task):
async with semaphore:
try:
result = await solve_one_async(
session, task["sitekey"], task["url"],
)
return result
except Exception as e:
return {"url": task["url"], "token": None, "error": str(e)}
async with aiohttp.ClientSession() as session:
coros = [solve_with_limit(t) for t in tasks]
results = await asyncio.gather(*coros)
return results
# Usage
tasks = [
{"sitekey": "KEY", "url": f"https://example.com/page{i}"}
for i in range(50)
]
results = asyncio.run(batch_solve_async(tasks, max_concurrent=20))
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")
Submit-Then-Poll Pattern
For maximum throughput, separate submission from polling:
import requests
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
def batch_submit(tasks):
"""Submit all tasks first, return task IDs."""
submitted = []
for task in tasks:
try:
data = {
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": task["sitekey"],
"pageurl": task["url"],
"json": 1,
}
resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
result = resp.json()
if result.get("status") == 1:
submitted.append({
"task_id": result["request"],
"url": task["url"],
})
time.sleep(0.1) # Brief delay between submits
except Exception as e:
print(f"Submit failed for {task['url']}: {e}")
return submitted
def batch_poll(submitted, timeout=120):
"""Poll all submitted tasks until complete."""
pending = {s["task_id"]: s for s in submitted}
results = []
start = time.time()
while pending and time.time() - start < timeout:
time.sleep(5)
for task_id in list(pending.keys()):
try:
resp = requests.get(f"{BASE_URL}/res.php", params={
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
info = pending.pop(task_id)
results.append({
"url": info["url"],
"token": data["request"],
})
except Exception:
pass
# Mark remaining as failed
for task_id, info in pending.items():
results.append({"url": info["url"], "token": None, "error": "timeout"})
return results
# Usage
tasks = [
{"sitekey": "KEY", "url": f"https://example.com/page{i}"}
for i in range(20)
]
submitted = batch_submit(tasks)
print(f"Submitted {len(submitted)} tasks")
results = batch_poll(submitted)
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")
Throughput Guide
| Concurrent Tasks | Approx Speed | Best For |
|---|---|---|
| 1-5 | 3-5 solves/min | Testing, light scraping |
| 5-20 | 15-60 solves/min | Production scraping |
| 20-50 | 60-150 solves/min | High-volume pipelines |
| 50-100 | 150-300 solves/min | Enterprise scale |
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Rate limit (429) | Too many submits/second | Add 100ms delay between submits |
| Many timeouts | Poll timeout too short | Increase to 120-180s |
| Diminishing returns above 50 concurrent | Network bottleneck | Use async (aiohttp) instead of threads |
| Results mixed up | Task ID tracking issue | Use dict keyed by task_id |
FAQ
How many tasks can I submit at once?
There's no hard limit on concurrent tasks. Start with 10-20 and increase based on your success rate and speed needs.
Should I use threads or async?
Threads (ThreadPoolExecutor) are simpler for small-medium batches (up to 50). For 100+ concurrent tasks, async (aiohttp) is more efficient.
Does batch solving cost more?
No. Each task costs the same whether submitted individually or in batch. Batch processing just saves time.
Related Guides
Scale your CAPTCHA solving — try CaptchaAI for high-throughput batch processing.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.