asyncio is powerful but requires rewriting your entire call chain as async. ThreadPoolExecutor gives you parallelism with standard synchronous code — drop it into existing projects without restructuring.
Why ThreadPoolExecutor for CAPTCHAs
CAPTCHA solving is I/O-bound (waiting for HTTP responses). Python threads release the GIL during I/O operations, making ThreadPoolExecutor efficient for this workload:
| Approach | Complexity | Fits existing code | Parallelism for I/O |
|---|---|---|---|
| Sequential | None | Yes | None |
| ThreadPoolExecutor | Low | Yes | Good |
| asyncio | High | Requires async rewrite | Best |
| multiprocessing | Medium | Mostly | Overkill for I/O |
Basic Implementation
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
def solve_captcha(sitekey, pageurl):
"""Synchronous CAPTCHA solve — submit and poll."""
# Submit
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1
})
data = resp.json()
if data.get("status") != 1:
raise RuntimeError(data.get("request", "Submit failed"))
captcha_id = data["request"]
# Poll for result
for _ in range(60):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY,
"action": "get",
"id": captcha_id,
"json": 1
}).json()
if result.get("status") == 1:
return result["request"]
if result.get("request") != "CAPCHA_NOT_READY":
raise RuntimeError(result.get("request", "Unknown error"))
raise TimeoutError("Solve timeout after 300s")
# Batch solve with ThreadPoolExecutor
tasks = [
{"sitekey": "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "pageurl": f"https://example.com/page/{i}"}
for i in range(20)
]
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(solve_captcha, t["sitekey"], t["pageurl"]): t
for t in tasks
}
solved = 0
failed = 0
for future in as_completed(futures):
task = futures[future]
try:
solution = future.result()
solved += 1
print(f"[OK] {task['pageurl']}: {solution[:30]}...")
except Exception as e:
failed += 1
print(f"[ERR] {task['pageurl']}: {e}")
elapsed = time.time() - start
print(f"\nDone: {solved} solved, {failed} failed in {elapsed:.1f}s")
Using Session for Connection Reuse
Creating a new TCP connection per request wastes time. Share a requests.Session per thread:
import threading
# Thread-local storage for sessions
thread_local = threading.local()
def get_session():
"""Get or create a thread-local session."""
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
# Configure connection pooling
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=2
)
thread_local.session.mount("https://", adapter)
return thread_local.session
def solve_captcha_pooled(sitekey, pageurl):
"""Solve using thread-local connection pooling."""
session = get_session()
resp = session.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1
})
data = resp.json()
if data.get("status") != 1:
raise RuntimeError(data.get("request"))
captcha_id = data["request"]
for _ in range(60):
time.sleep(5)
result = session.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY,
"action": "get",
"id": captcha_id,
"json": 1
}).json()
if result.get("status") == 1:
return result["request"]
if result.get("request") != "CAPCHA_NOT_READY":
raise RuntimeError(result.get("request"))
raise TimeoutError("Solve timeout")
map() for Simple Batch Operations
When you don't need per-task error handling:
def solve_task(task):
"""Wrapper that returns result dict."""
try:
solution = solve_captcha_pooled(task["sitekey"], task["pageurl"])
return {"url": task["pageurl"], "solution": solution, "error": None}
except Exception as e:
return {"url": task["pageurl"], "solution": None, "error": str(e)}
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(solve_task, tasks))
solved = [r for r in results if r["solution"]]
failed = [r for r in results if r["error"]]
print(f"Solved: {len(solved)}, Failed: {len(failed)}")
Timeout Protection
Prevent runaway threads from blocking your pool:
from concurrent.futures import TimeoutError as FuturesTimeout
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(solve_captcha_pooled, t["sitekey"], t["pageurl"]): t
for t in tasks
}
for future in as_completed(futures, timeout=600): # 10 min global timeout
task = futures[future]
try:
solution = future.result(timeout=120) # 2 min per task
print(f"[OK] {task['pageurl']}")
except FuturesTimeout:
print(f"[TIMEOUT] {task['pageurl']}")
except Exception as e:
print(f"[ERR] {task['pageurl']}: {e}")
Progress Callback
Track completion in real-time:
import threading
progress_lock = threading.Lock()
progress = {"done": 0, "total": 0}
def solve_with_progress(task):
result = solve_task(task)
with progress_lock:
progress["done"] += 1
pct = progress["done"] / progress["total"] * 100
print(f'\r Progress: {progress["done"]}/{progress["total"]} ({pct:.0f}%)', end="")
return result
progress["total"] = len(tasks)
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(solve_with_progress, tasks))
print() # Newline after progress
Choosing max_workers
| Workers | Concurrent solves | Overhead | Best for |
|---|---|---|---|
| 5 | 5 | Very low | Small batches, conservative use |
| 10 | 10 | Low | General use |
| 25 | 25 | Moderate | High-volume pipelines |
| 50 | 50 | Higher | Maximum throughput |
More workers means more concurrent API connections. Start at 10, increase while monitoring error rates.
ThreadPoolExecutor vs asyncio
# ThreadPoolExecutor — drop into existing sync code
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(solve_task, tasks))
# asyncio — requires async function chain
async def main():
async with aiohttp.ClientSession() as session:
tasks = [solve_async(session, t) for t in task_list]
results = await asyncio.gather(*tasks)
Use ThreadPoolExecutor when:
- Your existing codebase is synchronous
- You use libraries that don't support async (Selenium, some ORMs)
- You want quick parallelism without restructuring
Use asyncio when:
- Building from scratch
- Maximum efficiency matters (fewer OS threads)
- Already in an async framework (FastAPI, aiohttp)
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| All threads blocked | Every thread waiting on time.sleep during polling |
This is expected — threads release GIL during sleep |
ConnectionError spikes |
Too many concurrent connections | Reduce max_workers; use connection pooling |
| Results out of order | as_completed returns in completion order |
Use map() for ordered results, or track with dict |
| Memory growing | Large result objects held in futures | Process results in as_completed loop; don't store all |
FAQ
Does the GIL prevent real parallelism?
No — for I/O-bound work like HTTP requests and time.sleep, Python releases the GIL. Your threads run truly concurrent during network calls. The GIL only limits CPU-bound parallelism.
How many CAPTCHAs can ThreadPoolExecutor handle per hour?
With 10 workers and 15-second average solve time: ~2,400 per hour. With 25 workers: ~6,000 per hour. The bottleneck is CaptchaAI solve time, not Python threading.
Should I use ProcessPoolExecutor instead?
No. CAPTCHA solving is I/O-bound. ProcessPoolExecutor adds inter-process communication overhead with no benefit. Stick with threads.
Next Steps
Parallelize CAPTCHA solving — get your CaptchaAI API key and drop ThreadPoolExecutor into your pipeline.
Related guides:
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.