CaptchaAI supports high request volumes, but understanding capacity limits and implementing proper throttling keeps your pipeline reliable.
API Rate Limits
CaptchaAI does not enforce strict per-second rate limits. Instead, capacity is managed by worker availability:
| Factor | Behavior |
|---|---|
| Submit rate | No hard limit per second |
| Concurrent tasks | Handles 100+ per account |
| Poll frequency | Recommended: every 5 seconds per task |
| Balance check | No limit |
The main constraint is worker capacity. When all workers are busy, you get ERROR_NO_SLOT_AVAILABLE.
Handling ERROR_NO_SLOT_AVAILABLE
This error means the system is at capacity. Implement exponential backoff:
Python
import time
import requests
API_KEY = "YOUR_API_KEY"
def submit_with_backoff(params, max_retries=5):
params["key"] = API_KEY
for attempt in range(max_retries):
resp = requests.get(
"https://ocr.captchaai.com/in.php", params=params
)
if resp.text.startswith("OK|"):
return resp.text.split("|")[1]
if resp.text == "ERROR_NO_SLOT_AVAILABLE":
wait = min(2 ** attempt * 2, 60) # 2, 4, 8, 16, 32s max 60
print(f"No slots, waiting {wait}s (attempt {attempt + 1})")
time.sleep(wait)
continue
raise Exception(f"Submit error: {resp.text}")
raise Exception("Max retries exceeded — no slots available")
Node.js
async function submitWithBackoff(params, maxRetries = 5) {
params.key = process.env.CAPTCHAAI_API_KEY;
for (let attempt = 0; attempt < maxRetries; attempt++) {
const resp = await axios.get("https://ocr.captchaai.com/in.php", {
params,
});
const text = String(resp.data);
if (text.startsWith("OK|")) {
return text.split("|")[1];
}
if (text === "ERROR_NO_SLOT_AVAILABLE") {
const wait = Math.min(2 ** attempt * 2000, 60000);
console.log(`No slots, waiting ${wait}ms (attempt ${attempt + 1})`);
await new Promise((r) => setTimeout(r, wait));
continue;
}
throw new Error(`Submit error: ${text}`);
}
throw new Error("Max retries exceeded");
}
Client-Side Rate Limiting
Token Bucket (Python)
import time
import threading
class RateLimiter:
def __init__(self, rate, per=1.0):
"""Allow `rate` requests per `per` seconds."""
self.rate = rate
self.per = per
self.tokens = rate
self.last_refill = time.monotonic()
self.lock = threading.Lock()
def acquire(self):
with self.lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.rate, self.tokens + elapsed * (self.rate / self.per))
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
else:
sleep_time = (1 - self.tokens) * (self.per / self.rate)
time.sleep(sleep_time)
self.acquire()
# Allow 10 submissions per second
limiter = RateLimiter(rate=10, per=1.0)
def submit_limited(params):
limiter.acquire()
return submit_with_backoff(params)
Asyncio Semaphore
import asyncio
# Limit to 20 concurrent tasks
semaphore = asyncio.Semaphore(20)
async def solve_limited(solver, session, params):
async with semaphore:
return await solver.solve(session, params)
Poll Rate Control
Don't poll more frequently than every 5 seconds per task:
async def smart_poll(session, task_id, solver):
"""Polls with adaptive intervals."""
intervals = [5, 5, 5, 10, 10, 15, 15, 30, 30, 60]
for wait in intervals:
await asyncio.sleep(wait)
result = await solver.check(session, task_id)
if result is not None:
return result
raise TimeoutError(f"Task {task_id} timed out")
Monitoring and Metrics
Track your API usage to stay within limits:
import time
from collections import deque
class APIMetrics:
def __init__(self, window=60):
self.window = window
self.requests = deque()
self.errors = deque()
def record_request(self):
now = time.time()
self.requests.append(now)
self._cleanup(self.requests, now)
def record_error(self, error_code):
now = time.time()
self.errors.append((now, error_code))
self._cleanup_tuples(self.errors, now)
def get_rate(self):
now = time.time()
self._cleanup(self.requests, now)
return len(self.requests) / self.window
def get_error_rate(self):
now = time.time()
self._cleanup(self.requests, now)
self._cleanup_tuples(self.errors, now)
if not self.requests:
return 0
return len(self.errors) / len(self.requests)
def _cleanup(self, dq, now):
while dq and dq[0] < now - self.window:
dq.popleft()
def _cleanup_tuples(self, dq, now):
while dq and dq[0][0] < now - self.window:
dq.popleft()
metrics = APIMetrics()
# Use in your submit function
def submit_tracked(params):
metrics.record_request()
try:
return submit_with_backoff(params)
except Exception as e:
metrics.record_error(str(e))
raise
# Check metrics periodically
print(f"Rate: {metrics.get_rate():.1f} req/s")
print(f"Error rate: {metrics.get_error_rate():.1%}")
Capacity Planning
| Volume | Concurrency | Strategy |
|---|---|---|
| < 100/hour | 1-5 | Sequential, no rate control needed |
| 100-1K/hour | 5-20 | Semaphore-based concurrency |
| 1K-10K/hour | 20-50 | Async with queue, callbacks |
| 10K+/hour | 50-100 | Worker pool, dedicated capacity |
For volumes above 10K/hour, contact CaptchaAI support for dedicated capacity.
FAQ
Is there a daily limit?
No hard daily limit. Your limit is determined by your balance and the API's worker capacity.
What happens if I exceed capacity?
You'll receive ERROR_NO_SLOT_AVAILABLE. It's temporary — retry after a backoff period.
Should I throttle on the client side?
Yes, for predictable behavior. Use a semaphore to cap concurrent tasks and a token bucket to control submission rate.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.