Kubernetes provides auto-scaling, self-healing infrastructure for high-volume CAPTCHA solving. This guide deploys worker pods that pull tasks from a Redis queue and scale based on demand.
Architecture
Producer → Redis Queue → Worker Pods (auto-scaled) → CaptchaAI API
↓
Results Store (Redis)
Worker Deployment
# k8s/worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: captcha-worker
labels:
app: captcha-worker
spec:
replicas: 3
selector:
matchLabels:
app: captcha-worker
template:
metadata:
labels:
app: captcha-worker
spec:
containers:
- name: worker
image: your-registry/captcha-worker:latest
env:
- name: CAPTCHAAI_KEY
valueFrom:
secretKeyRef:
name: captchaai-secret
key: api-key
- name: REDIS_URL
value: "redis://redis-service:6379"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "250m"
Kubernetes Secret
kubectl create secret generic captchaai-secret \
--from-literal=api-key=YOUR_API_KEY
Redis Deployment
# k8s/redis.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
resources:
requests:
memory: "128Mi"
cpu: "100m"
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
selector:
app: redis
ports:
- port: 6379
Worker Code
# worker.py
import os
import json
import time
import redis
import requests
class CaptchaWorker:
"""Kubernetes worker that processes CAPTCHA tasks from Redis."""
def __init__(self):
self.api_key = os.environ["CAPTCHAAI_KEY"]
self.redis = redis.from_url(
os.environ.get("REDIS_URL", "redis://localhost:6379"),
)
self.base = "https://ocr.captchaai.com"
def run(self):
"""Main worker loop."""
hostname = os.environ.get("HOSTNAME", "unknown")
print(f"Worker {hostname} started")
while True:
result = self.redis.blpop("captcha:queue", timeout=30)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task.get("id", "unknown")
print(f"[{hostname}] Processing {task_id}")
start = time.time()
try:
token = self._solve(task["method"], task["params"])
duration = time.time() - start
self.redis.hset("captcha:results", task_id, json.dumps({
"status": "success",
"token": token,
"duration": f"{duration:.1f}s",
"worker": hostname,
}))
print(f"[{hostname}] {task_id} solved in {duration:.1f}s")
except Exception as e:
self.redis.hset("captcha:results", task_id, json.dumps({
"status": "error",
"error": str(e),
"worker": hostname,
}))
print(f"[{hostname}] {task_id} failed: {e}")
# Update queue length metric
queue_len = self.redis.llen("captcha:queue")
self.redis.set("captcha:queue_length", queue_len)
def _solve(self, method, params, timeout=120):
resp = requests.post(f"{self.base}/in.php", data={
"key": self.api_key,
"method": method,
"json": 1,
**params,
}, timeout=30)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
if __name__ == "__main__":
CaptchaWorker().run()
Horizontal Pod Autoscaler
Scale workers based on queue depth:
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: captcha-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: captcha-worker
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: redis_queue_length
selector:
matchLabels:
queue: captcha
target:
type: AverageValue
averageValue: "10"
Task Producer
import json
import uuid
import redis
def submit_tasks(redis_url, tasks):
"""Submit CAPTCHA tasks to the queue."""
r = redis.from_url(redis_url)
task_ids = []
for task in tasks:
task_id = str(uuid.uuid4())[:8]
task["id"] = task_id
r.rpush("captcha:queue", json.dumps(task))
task_ids.append(task_id)
return task_ids
def get_results(redis_url, task_ids, timeout=180):
"""Wait for and collect results."""
r = redis.from_url(redis_url)
results = {}
deadline = time.time() + timeout
while len(results) < len(task_ids) and time.time() < deadline:
for tid in task_ids:
if tid in results:
continue
raw = r.hget("captcha:results", tid)
if raw:
results[tid] = json.loads(raw)
time.sleep(1)
return results
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Workers not starting | Secret not created | Run kubectl create secret command |
| Pods in CrashLoopBackOff | Missing env vars or Redis | Check logs with kubectl logs |
| HPA not scaling | Custom metrics not configured | Install metrics adapter (KEDA) |
| Queue growing but no processing | Workers idle/crashed | Check pod health and restart |
FAQ
How many worker pods should I start with?
Start with 3 replicas and let the HPA scale based on queue depth. Each pod handles ~5-10 concurrent solves depending on CAPTCHA type.
Should I use Jobs or Deployments?
Use Deployments for continuous workers that process a shared queue. Use Jobs for batch workloads with a fixed number of tasks.
Can I use KEDA instead of HPA?
Yes. KEDA (Kubernetes Event-Driven Autoscaling) natively supports Redis queue length as a scaling trigger and is easier to configure than custom metrics.
Related Guides
Scale to thousands — get CaptchaAI for Kubernetes.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.