CAPTCHA solving is I/O-bound (waiting for the API), but the surrounding scraping work — HTML parsing, data extraction, file writing — benefits from multiple processes. Multiprocessing bypasses Python's GIL and enables true parallel execution.
When to use multiprocessing vs threading vs asyncio
| Approach | Best for | GIL-free | Overhead |
|---|---|---|---|
| Threading | Pure I/O waits | No | Low |
| Asyncio | Many concurrent I/O tasks | No | Low |
| Multiprocessing | CPU + I/O mixed workloads | Yes | Higher |
Use multiprocessing when each job includes both CAPTCHA solving (I/O) and heavy data processing (CPU).
ProcessPoolExecutor — simplest pattern
import time
import requests
from concurrent.futures import ProcessPoolExecutor, as_completed
API_KEY = "YOUR_API_KEY"
def solve_captcha(task):
"""Solve a single CAPTCHA — runs in a separate process."""
method = task["method"]
params = task["params"]
submit = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY, "method": method, "json": 1, **params,
}, timeout=30).json()
if submit.get("status") != 1:
return {"task_id": task["id"], "status": "error", "error": submit.get("request")}
captcha_id = submit["request"]
for _ in range(30):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY, "action": "get", "id": captcha_id, "json": 1,
}, timeout=30).json()
if result.get("status") == 1:
return {"task_id": task["id"], "status": "solved", "token": result["request"]}
if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
return {"task_id": task["id"], "status": "error", "error": "unsolvable"}
return {"task_id": task["id"], "status": "error", "error": "timeout"}
def solve_batch(tasks, max_workers=4):
"""Solve multiple CAPTCHAs in parallel processes."""
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(solve_captcha, task): task for task in tasks}
for future in as_completed(futures):
result = future.result()
results.append(result)
print(f"Task {result['task_id']}: {result['status']}")
return results
# Usage
tasks = [
{"id": i, "method": "userrecaptcha", "params": {"googlekey": f"KEY_{i}", "pageurl": f"https://example.com/{i}"}}
for i in range(10)
]
results = solve_batch(tasks, max_workers=4)
solved = [r for r in results if r["status"] == "solved"]
print(f"Solved: {len(solved)}/{len(tasks)}")
multiprocessing.Pool with map
from multiprocessing import Pool
def solve_single(args):
"""Worker function for Pool.map — must accept a single argument."""
task_id, method, sitekey, url = args
try:
submit = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY, "method": method, "googlekey": sitekey,
"pageurl": url, "json": 1,
}, timeout=30).json()
if submit.get("status") != 1:
return (task_id, None, submit.get("request"))
captcha_id = submit["request"]
for _ in range(30):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY, "action": "get", "id": captcha_id, "json": 1,
}, timeout=30).json()
if result.get("status") == 1:
return (task_id, result["request"], None)
return (task_id, None, "timeout")
except Exception as e:
return (task_id, None, str(e))
def solve_with_pool(work_items, num_processes=4):
"""Use Pool.map for simple parallel execution."""
with Pool(processes=num_processes) as pool:
results = pool.map(solve_single, work_items)
for task_id, token, error in results:
if token:
print(f"Task {task_id}: solved")
else:
print(f"Task {task_id}: {error}")
return results
# Usage
work = [
(i, "userrecaptcha", f"SITEKEY_{i}", f"https://example.com/page{i}")
for i in range(8)
]
results = solve_with_pool(work, num_processes=4)
Shared state with Manager
When processes need to share state (counters, results dict):
from multiprocessing import Process, Manager
import time
import requests
API_KEY = "YOUR_API_KEY"
def worker(task_queue, result_dict, counter, api_key):
"""Worker process with shared state."""
while True:
try:
task = task_queue.get(timeout=5)
except Exception:
break
if task is None: # Poison pill
break
try:
# Solve CAPTCHA
submit = requests.post("https://ocr.captchaai.com/in.php", data={
"key": api_key, "method": task["method"], "json": 1, **task["params"],
}, timeout=30).json()
if submit.get("status") != 1:
result_dict[task["id"]] = {"error": submit.get("request")}
continue
captcha_id = submit["request"]
for _ in range(30):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key, "action": "get", "id": captcha_id, "json": 1,
}, timeout=30).json()
if result.get("status") == 1:
result_dict[task["id"]] = {"token": result["request"]}
counter["solved"] = counter.get("solved", 0) + 1
break
else:
result_dict[task["id"]] = {"error": "timeout"}
counter["failed"] = counter.get("failed", 0) + 1
except Exception as e:
result_dict[task["id"]] = {"error": str(e)}
counter["failed"] = counter.get("failed", 0) + 1
def parallel_solve_with_manager(tasks, num_workers=4):
"""Parallel CAPTCHA solving with shared state."""
with Manager() as manager:
task_queue = manager.Queue()
result_dict = manager.dict()
counter = manager.dict({"solved": 0, "failed": 0})
# Load queue
for task in tasks:
task_queue.put(task)
for _ in range(num_workers):
task_queue.put(None) # Poison pills
# Start workers
processes = []
for _ in range(num_workers):
p = Process(target=worker, args=(task_queue, result_dict, counter, API_KEY))
p.start()
processes.append(p)
# Wait for all workers
for p in processes:
p.join()
print(f"Solved: {counter['solved']}, Failed: {counter['failed']}")
return dict(result_dict)
Hybrid: multiprocessing + asyncio
The most powerful pattern — multiple processes, each running an async event loop:
import asyncio
import aiohttp
from multiprocessing import Pool
from functools import partial
API_KEY = "YOUR_API_KEY"
async def solve_async_batch(api_key, tasks):
"""Async solver running inside a process."""
results = []
semaphore = asyncio.Semaphore(5)
async def solve_one(task):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": api_key, "method": task["method"], "json": 1, **task["params"],
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
return {"id": task["id"], "error": data.get("request")}
task_id = data["request"]
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return {"id": task["id"], "token": result["request"]}
return {"id": task["id"], "error": "timeout"}
results = await asyncio.gather(*[solve_one(t) for t in tasks])
return results
def process_chunk(args):
"""Entry point for each process — runs async event loop."""
api_key, chunk = args
return asyncio.run(solve_async_batch(api_key, chunk))
def hybrid_solve(tasks, num_processes=4, batch_size=10):
"""Split tasks across processes, each running async solvers."""
# Split into chunks
chunks = [tasks[i:i + batch_size] for i in range(0, len(tasks), batch_size)]
chunk_args = [(API_KEY, chunk) for chunk in chunks]
with Pool(processes=num_processes) as pool:
all_results = pool.map(process_chunk, chunk_args)
# Flatten results
flat = [r for batch in all_results for r in batch]
solved = sum(1 for r in flat if "token" in r)
print(f"Total: {len(flat)}, Solved: {solved}")
return flat
# Usage
tasks = [
{"id": i, "method": "userrecaptcha", "params": {"googlekey": f"KEY_{i}", "pageurl": f"https://example.com/{i}"}}
for i in range(40)
]
results = hybrid_solve(tasks, num_processes=4, batch_size=10)
Choosing the right concurrency level
CaptchaAI plan limit → max total concurrent solves
Number of CPU cores → max processes (multiprocessing)
Tasks per process → max async tasks per event loop
API response time → determines throughput ceiling
Example: 4-core machine, API limit 20 concurrent
→ 4 processes × 5 async tasks = 20 concurrent solves
Troubleshooting
| Symptom | Cause | Fix |
|---|---|---|
Can't pickle error |
Lambda or non-picklable object passed to Pool | Use module-level functions only |
Processes hang on join() |
Worker stuck in infinite loop | Add timeout to queue.get() |
| Results missing | Shared dict not synced | Use Manager().dict() |
| High memory usage | Too many processes | Reduce num_processes |
ERROR_NO_SLOT_AVAILABLE |
Exceeding API concurrency | Lower total concurrent workers |
Frequently asked questions
Is multiprocessing faster than asyncio for CAPTCHA solving?
Not for pure CAPTCHA solving (I/O-bound). Multiprocessing shines when you also do CPU-heavy work like data parsing or image processing alongside solving.
How many processes should I use?
Match your CPU core count for CPU-bound work. For I/O-bound CAPTCHA solving, 2-4 processes with async inside each is optimal.
Can I share a requests.Session across processes?
No — each process needs its own Session. Sessions are not process-safe.
Summary
Python multiprocessing enables true parallel CAPTCHA solving with CaptchaAI. Use ProcessPoolExecutor for simple cases, Manager for shared state, or the hybrid multiprocessing + asyncio pattern for maximum throughput.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.