Before pushing your CAPTCHA solving pipeline to production, you need to know where it breaks. Load testing reveals your pipeline's maximum throughput, the concurrency level where errors spike, and which resources hit limits first. This guide shows how to run structured load tests against CaptchaAI.
What to Measure
| Metric | Why It Matters | Target |
|---|---|---|
| Throughput (solves/minute) | Capacity planning | Match your expected peak load |
| Error rate | Reliability | < 5% at target concurrency |
| P50/P90 solve time | User experience | Within CAPTCHA type typical range |
| Memory usage | Resource planning | < 500 MB at peak |
| Connection count | Infrastructure limits | Within OS/proxy limits |
Load Test Strategy
Ramp up concurrency in steps to find the inflection point where performance degrades:
Step 1: 5 concurrent (baseline)
Step 2: 10 concurrent
Step 3: 25 concurrent
Step 4: 50 concurrent
Step 5: 100 concurrent
Step 6: 200 concurrent (if steps 1-5 pass)
At each step, run for 5 minutes and record all metrics.
Python Load Test Script
# load_test_captcha.py
import os
import asyncio
import time
import statistics
import aiohttp
import tracemalloc
API_KEY = os.environ.get("CAPTCHAAI_KEY", "YOUR_API_KEY")
BASE_URL = "https://ocr.captchaai.com"
# Test parameters — adjust sitekey and pageurl to your target
TEST_SITEKEY = "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-"
TEST_PAGEURL = "https://www.google.com/recaptcha/api2/demo"
class LoadTestResults:
"""Collect metrics during load test."""
def __init__(self):
self.solve_times = []
self.errors = []
self.successes = 0
self.failures = 0
self.start_time = None
def record_success(self, solve_time):
self.solve_times.append(solve_time)
self.successes += 1
def record_failure(self, error):
self.errors.append(error)
self.failures += 1
def report(self, concurrency):
total = self.successes + self.failures
elapsed = time.time() - self.start_time
throughput = self.successes / (elapsed / 60) if elapsed > 0 else 0
print(f"\n{'='*50}")
print(f"Concurrency: {concurrency}")
print(f"Duration: {elapsed:.0f}s")
print(f"Total tasks: {total}")
print(f"Successes: {self.successes}")
print(f"Failures: {self.failures}")
print(f"Error rate: {self.failures/total*100:.1f}%" if total > 0 else "N/A")
print(f"Throughput: {throughput:.1f} solves/min")
if self.solve_times:
self.solve_times.sort()
print(f"Solve time P50: {statistics.median(self.solve_times):.1f}s")
print(f"Solve time P90: {self.solve_times[int(len(self.solve_times)*0.9)]:.1f}s")
print(f"Solve time min: {min(self.solve_times):.1f}s")
print(f"Solve time max: {max(self.solve_times):.1f}s")
if self.errors:
from collections import Counter
error_dist = Counter(self.errors)
print(f"Error distribution: {dict(error_dist)}")
mem = tracemalloc.get_traced_memory()
print(f"Memory: current={mem[0]/1024/1024:.1f}MB, peak={mem[1]/1024/1024:.1f}MB")
async def solve_one(session, results, semaphore):
"""Single CAPTCHA solve for load testing."""
async with semaphore:
start = time.time()
try:
# Submit
async with session.get(f"{BASE_URL}/in.php", params={
"key": API_KEY, "method": "userrecaptcha",
"googlekey": TEST_SITEKEY,
"pageurl": TEST_PAGEURL, "json": "1",
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") != 1:
results.record_failure(result.get("request", "UNKNOWN"))
return
task_id = result["request"]
await asyncio.sleep(15)
# Poll
for _ in range(25):
async with session.get(f"{BASE_URL}/res.php", params={
"key": API_KEY, "action": "get",
"id": task_id, "json": "1",
}) as resp:
poll_result = await resp.json(content_type=None)
if poll_result.get("status") == 1:
results.record_success(time.time() - start)
return
if poll_result.get("request") != "CAPCHA_NOT_READY":
results.record_failure(poll_result.get("request", "UNKNOWN"))
return
await asyncio.sleep(5)
results.record_failure("TIMEOUT")
except Exception as e:
results.record_failure(str(e)[:50])
async def run_load_test(concurrency, total_tasks):
"""Run load test at specified concurrency."""
results = LoadTestResults()
results.start_time = time.time()
semaphore = asyncio.Semaphore(concurrency)
connector = aiohttp.TCPConnector(
limit=concurrency, keepalive_timeout=60,
)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [
solve_one(session, results, semaphore)
for _ in range(total_tasks)
]
await asyncio.gather(*tasks)
results.report(concurrency)
return results
async def main():
tracemalloc.start()
# Ramp up concurrency
concurrency_levels = [5, 10, 25, 50]
tasks_per_level = 20 # Adjust based on budget
for level in concurrency_levels:
print(f"\n>>> Starting load test: concurrency={level}, tasks={tasks_per_level}")
results = await run_load_test(level, tasks_per_level)
# Stop if error rate exceeds 20%
total = results.successes + results.failures
if total > 0 and results.failures / total > 0.20:
print(f"\n!!! Error rate exceeded 20% at concurrency={level}. Stopping.")
break
# Cool down between levels
await asyncio.sleep(10)
tracemalloc.stop()
asyncio.run(main())
JavaScript Load Test
// load_test_captcha.js
const axios = require('axios');
const https = require('https');
const API_KEY = process.env.CAPTCHAAI_KEY || 'YOUR_API_KEY';
const SITEKEY = '6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-';
const PAGEURL = 'https://www.google.com/recaptcha/api2/demo';
async function runLoadTest(concurrency, totalTasks) {
const agent = new https.Agent({ keepAlive: true, maxSockets: concurrency });
const api = axios.create({
baseURL: 'https://ocr.captchaai.com', httpsAgent: agent, timeout: 120000,
});
const results = { successes: 0, failures: 0, times: [], errors: [] };
const active = new Set();
for (let i = 0; i < totalTasks; i++) {
const p = (async () => {
const start = Date.now();
try {
const submit = await api.get('/in.php', {
params: { key: API_KEY, method: 'userrecaptcha', googlekey: SITEKEY, pageurl: PAGEURL, json: '1' },
});
if (submit.data.status !== 1) { results.failures++; results.errors.push(submit.data.request); return; }
await new Promise(r => setTimeout(r, 15000));
for (let j = 0; j < 25; j++) {
const poll = await api.get('/res.php', {
params: { key: API_KEY, action: 'get', id: submit.data.request, json: '1' },
});
if (poll.data.status === 1) {
results.successes++;
results.times.push((Date.now() - start) / 1000);
return;
}
if (poll.data.request !== 'CAPCHA_NOT_READY') {
results.failures++; results.errors.push(poll.data.request); return;
}
await new Promise(r => setTimeout(r, 5000));
}
results.failures++; results.errors.push('TIMEOUT');
} catch (e) { results.failures++; results.errors.push(e.message.slice(0, 50)); }
})().then(() => active.delete(p));
active.add(p);
if (active.size >= concurrency) await Promise.race(active);
}
await Promise.all(active);
agent.destroy();
// Report
const total = results.successes + results.failures;
results.times.sort((a, b) => a - b);
console.log(`\nConcurrency: ${concurrency}`);
console.log(`Total: ${total}, Success: ${results.successes}, Failed: ${results.failures}`);
console.log(`Error rate: ${total > 0 ? (results.failures / total * 100).toFixed(1) : 0}%`);
if (results.times.length) {
console.log(`P50: ${results.times[Math.floor(results.times.length * 0.5)].toFixed(1)}s`);
console.log(`P90: ${results.times[Math.floor(results.times.length * 0.9)].toFixed(1)}s`);
}
const mem = process.memoryUsage();
console.log(`Memory RSS: ${(mem.rss / 1024 / 1024).toFixed(1)} MB`);
return results;
}
(async () => {
for (const c of [5, 10, 25, 50]) {
console.log(`\n>>> Load test: concurrency=${c}`);
await runLoadTest(c, 20);
await new Promise(r => setTimeout(r, 10000));
}
})();
Interpreting Results
Good Results
| Metric | Healthy Range |
|---|---|
| Error rate | < 5% |
| P90 solve time | < 2x median |
| Memory growth | Stable (flat) |
| Throughput | Linear with concurrency |
Warning Signs
| Warning | What It Means | Action |
|---|---|---|
| Error rate > 10% at low concurrency | API key or parameter issue | Fix before scaling |
| P90 > 3x median | Network or proxy instability | Check connection quality |
| Memory growing linearly | Possible leak | Profile memory allocations |
| Throughput plateaus | Bottleneck reached | Check connections, CPU, or rate limits |
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| All tasks timeout at high concurrency | Connection pool exhaustion | Increase maxSockets |
ERROR_NO_SLOT_AVAILABLE errors |
CaptchaAI rate limiting | Reduce concurrency or add delays |
| Test drains balance | Too many real solves | Reduce tasks_per_level |
| Inconsistent results between runs | Network variability | Run multiple iterations and average |
FAQ
How many solves should I budget for load testing?
Start with 20 tasks per concurrency level across 4 levels = 80 total solves. This costs approximately $0.40 at standard pricing.
Should I load test against production or a test CAPTCHA?
Use the Google reCAPTCHA demo site for testing connection capacity. For solve accuracy testing, use your actual target site.
How often should I re-run load tests?
After infrastructure changes, code updates, or provider upgrades. Quarterly is a good cadence for stable systems.
Related Articles
- Build Automated Testing Pipeline Captchaai
- Github Actions Captchaai Cicd Captcha Testing
- Captchaai Load Balancer Architecture
Next Steps
Find your pipeline's limits before they find you — get your CaptchaAI API key.
Related guides:
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.