DevOps & Scaling

Auto-Scaling CAPTCHA Solving Workers

Static worker pools waste money during quiet times and create bottlenecks during peaks. Auto-scaling matches worker count to actual demand, optimizing both cost and throughput.


Scaling Signals

Signal Scale Up When Scale Down When
Queue depth > 20 pending tasks < 5 pending tasks
Worker utilization > 80% busy < 20% busy
Solve latency P95 > 60 seconds P95 < 20 seconds
Error rate > 5% (need fresh workers) Stable < 1%
Balance N/A Balance < $1 (stop scaling)

Thread-Based Auto-Scaler

Scale worker threads within a single process:

import os
import time
import threading
import requests
import json
import redis


class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }


# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

Process-Based Auto-Scaler

Scale worker processes for CPU isolation:

import multiprocessing
import time
import redis
import os


class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

Balance-Aware Scaling

Stop scaling when funds run low:

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

Integrate into the scaling loop:

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

Scaling Strategies Compared

Strategy Best For Latency Complexity
Thread pool I/O-bound (API calls) Low Low
Process pool CPU-bound preprocessing Medium Medium
Kubernetes HPA Cloud-native deployments Higher High
KEDA Event-driven scaling Medium Medium

Troubleshooting

Issue Cause Fix
Workers keep scaling up Queue never drains Check if workers are actually processing
Scale-down too aggressive Low threshold Increase scale-down delay to 30s+
Zombie processes Processes not cleaned up Use _cleanup_dead() regularly
Balance drains fast Too many workers Add balance check to scaling logic

FAQ

What's the right worker-to-queue ratio?

Aim for 1 worker per 5-10 queued tasks. Each worker processes ~3-6 CAPTCHAs per minute depending on type.

Should I use threads or processes?

Threads for pure API calling (CaptchaAI is I/O-bound). Processes when you also do image preprocessing or heavy computation alongside solving.

How fast should I scale up?

Scale up quickly (every 10-15s check), scale down slowly (wait 30-60s of low load). This prevents thrashing between states.



Scale smart — get your CaptchaAI key today.

Discussions (0)

No comments yet.

Related Posts

DevOps & Scaling Horizontal Scaling CAPTCHA Solving Workers: When and How
Scale CAPTCHA solving horizontally — identify bottlenecks, add workers dynamically, auto-scale based on queue depth, and manage costs with Captcha AI.

Scale CAPTCHA solving horizontally — identify bottlenecks, add workers dynamically, auto-scale based on queue...

Automation Python All CAPTCHA Types
Mar 07, 2026
DevOps & Scaling Ansible Playbooks for CaptchaAI Worker Deployment
Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates, and health checks across your server fleet.

Deploy and manage Captcha AI workers with Ansible — playbooks for provisioning, configuration, rolling updates...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling Blue-Green Deployment for CAPTCHA Solving Infrastructure
Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switching, and rollback strategies with Captcha AI.

Implement blue-green deployments for CAPTCHA solving infrastructure — zero-downtime upgrades, traffic switchin...

Automation Python All CAPTCHA Types
Apr 07, 2026
DevOps & Scaling CaptchaAI Monitoring with Datadog: Metrics and Alerts
Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve rate tracking for CAPTCHA solving pipelines.

Monitor Captcha AI performance with Datadog — custom metrics, dashboards, anomaly detection alerts, and solve...

Automation Python All CAPTCHA Types
Feb 19, 2026
DevOps & Scaling OpenTelemetry Tracing for CAPTCHA Solving Pipelines
Instrument CAPTCHA solving pipelines with Open Telemetry — distributed traces, spans for submit/poll phases, and vendor-neutral observability with Captcha AI.

Instrument CAPTCHA solving pipelines with Open Telemetry — distributed traces, spans for submit/poll phases, a...

Automation Python All CAPTCHA Types
Mar 07, 2026
DevOps & Scaling Rolling Updates for CAPTCHA Solving Worker Fleets
Implement rolling updates for CAPTCHA solving worker fleets — zero-downtime upgrades, graceful draining, health-gated progression, and automatic rollback.

Implement rolling updates for CAPTCHA solving worker fleets — zero-downtime upgrades, graceful draining, healt...

Automation Python All CAPTCHA Types
Feb 28, 2026
DevOps & Scaling CaptchaAI Behind a Load Balancer: Architecture Patterns
Architect CAPTCHA solving workers behind a load balancer — routing strategies, health checks, sticky sessions, and scaling patterns with Captcha AI.

Architect CAPTCHA solving workers behind a load balancer — routing strategies, health checks, sticky sessions,...

Automation Python All CAPTCHA Types
Feb 24, 2026
DevOps & Scaling CaptchaAI Monitoring with New Relic: APM Integration
Integrate Captcha AI with New Relic APM — custom events, transaction tracing, dashboards, and alert policies for CAPTCHA solving performance.

Integrate Captcha AI with New Relic APM — custom events, transaction tracing, dashboards, and alert policies f...

Automation Python All CAPTCHA Types
Jan 31, 2026
Reference CAPTCHA Solving Performance by Region: Latency Analysis
Analyze how geographic region affects Captcha AI solve times — network latency, proxy location, and optimization strategies for global deployments.

Analyze how geographic region affects Captcha AI solve times — network latency, proxy location, and optimizati...

Automation Python All CAPTCHA Types
Apr 05, 2026
DevOps & Scaling GitHub Actions + CaptchaAI: CI/CD CAPTCHA Testing
Integrate Captcha AI with Git Hub Actions for automated CAPTCHA testing in CI/CD pipelines.

Integrate Captcha AI with Git Hub Actions for automated CAPTCHA testing in CI/CD pipelines. Test flows, verify...

Python reCAPTCHA v2 Testing
Feb 04, 2026
DevOps & Scaling High Availability CAPTCHA Solving: Failover and Redundancy
Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and graceful degradation with Captcha AI.

Build a high-availability CAPTCHA solving system — automatic failover, health checks, redundant workers, and g...

Automation Python All CAPTCHA Types
Mar 27, 2026
DevOps & Scaling Docker + CaptchaAI: Containerized CAPTCHA Solving
Run Captcha AI integrations in Docker containers.

Run Captcha AI integrations in Docker containers. Dockerfile, environment variables, multi-stage builds, and D...

Automation Python All CAPTCHA Types
Mar 09, 2026