A CAPTCHA solving pipeline that loses in-flight tasks during an outage costs you data, time, and money. Disaster recovery (DR) planning ensures you can recover from infrastructure failures, API outages, or configuration mistakes with minimal data loss.
DR Objectives
| Metric | Definition | CAPTCHA Pipeline Target |
|---|---|---|
| RPO (Recovery Point Objective) | Maximum data loss tolerable | < 5 minutes of queued tasks |
| RTO (Recovery Time Objective) | Maximum time to restore service | < 15 minutes |
| MTTR (Mean Time to Recovery) | Average recovery time | < 10 minutes |
Failure Scenarios
Scenario 1: Worker crash → Restart workers, replay queue
Scenario 2: Queue data loss → Restore from persistent backup
Scenario 3: Network partition → Failover to secondary region
Scenario 4: API key compromised → Rotate key, update workers
Scenario 5: Config corruption → Rollback to last known good
Task Persistence Layer
Never solve CAPTCHAs from an in-memory-only queue. Persist tasks to survive crashes.
Python — Persistent Task Queue
import os
import json
import time
import sqlite3
import threading
import requests
from datetime import datetime
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
class PersistentTaskQueue:
"""SQLite-backed task queue that survives crashes."""
def __init__(self, db_path="captcha_tasks.db"):
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.lock = threading.Lock()
self._init_db()
def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
payload TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
started_at TEXT,
completed_at TEXT,
result TEXT,
attempts INTEGER DEFAULT 0
)
""")
self.conn.commit()
def enqueue(self, task_id, payload):
with self.lock:
self.conn.execute(
"INSERT INTO tasks (id, payload) VALUES (?, ?)",
(task_id, json.dumps(payload))
)
self.conn.commit()
def dequeue(self):
with self.lock:
cursor = self.conn.execute(
"SELECT id, payload FROM tasks "
"WHERE status = 'pending' ORDER BY created_at LIMIT 1"
)
row = cursor.fetchone()
if not row:
return None
task_id, payload = row
self.conn.execute(
"UPDATE tasks SET status = 'processing', "
"started_at = ?, attempts = attempts + 1 WHERE id = ?",
(datetime.utcnow().isoformat(), task_id)
)
self.conn.commit()
return {"id": task_id, "payload": json.loads(payload)}
def complete(self, task_id, result):
with self.lock:
self.conn.execute(
"UPDATE tasks SET status = 'completed', "
"completed_at = ?, result = ? WHERE id = ?",
(datetime.utcnow().isoformat(), json.dumps(result), task_id)
)
self.conn.commit()
def fail(self, task_id, error):
with self.lock:
# Requeue if under retry limit
cursor = self.conn.execute(
"SELECT attempts FROM tasks WHERE id = ?", (task_id,)
)
row = cursor.fetchone()
if row and row[0] < 3:
self.conn.execute(
"UPDATE tasks SET status = 'pending' WHERE id = ?",
(task_id,)
)
else:
self.conn.execute(
"UPDATE tasks SET status = 'failed', "
"result = ? WHERE id = ?",
(json.dumps({"error": error}), task_id)
)
self.conn.commit()
def recover_stale(self, timeout_seconds=600):
"""Reset tasks stuck in 'processing' after a crash."""
with self.lock:
cutoff = datetime.utcnow().timestamp() - timeout_seconds
self.conn.execute(
"UPDATE tasks SET status = 'pending' "
"WHERE status = 'processing' "
"AND started_at < datetime(?, 'unixepoch')",
(cutoff,)
)
count = self.conn.total_changes
self.conn.commit()
return count
@property
def stats(self):
cursor = self.conn.execute(
"SELECT status, COUNT(*) FROM tasks GROUP BY status"
)
return dict(cursor.fetchall())
# On startup: recover tasks that were processing during a crash
queue = PersistentTaskQueue()
recovered = queue.recover_stale(timeout_seconds=600)
print(f"Recovered {recovered} stale tasks after restart")
JavaScript — Recovery Manager
const axios = require("axios");
const fs = require("fs");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
class DisasterRecoveryManager {
constructor(checkpointDir = "./dr-checkpoints") {
this.checkpointDir = checkpointDir;
if (!fs.existsSync(checkpointDir)) {
fs.mkdirSync(checkpointDir, { recursive: true });
}
}
checkpoint(label, data) {
const filename = `${this.checkpointDir}/${label}-${Date.now()}.json`;
fs.writeFileSync(filename, JSON.stringify(data, null, 2));
this.pruneOldCheckpoints(label, 10); // Keep last 10
return filename;
}
restore(label) {
const files = fs.readdirSync(this.checkpointDir)
.filter((f) => f.startsWith(label) && f.endsWith(".json"))
.sort()
.reverse();
if (files.length === 0) return null;
const latest = fs.readFileSync(
`${this.checkpointDir}/${files[0]}`, "utf8"
);
return JSON.parse(latest);
}
pruneOldCheckpoints(label, keep) {
const files = fs.readdirSync(this.checkpointDir)
.filter((f) => f.startsWith(label) && f.endsWith(".json"))
.sort();
while (files.length > keep) {
const old = files.shift();
fs.unlinkSync(`${this.checkpointDir}/${old}`);
}
}
async healthCheck() {
try {
const resp = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "getbalance", json: 1 },
timeout: 10000,
});
return {
healthy: resp.data.status === 1,
balance: parseFloat(resp.data.request || 0),
};
} catch (err) {
return { healthy: false, error: err.message };
}
}
}
class ResilientSolver {
constructor() {
this.dr = new DisasterRecoveryManager();
this.pendingTasks = [];
}
async solveBatch(tasks) {
// Checkpoint before starting
this.dr.checkpoint("batch-pending", {
tasks,
startedAt: new Date().toISOString(),
});
const results = [];
for (const task of tasks) {
try {
const result = await this.solveSingle(task);
results.push({ taskId: task.id, ...result });
} catch (err) {
results.push({ taskId: task.id, error: err.message });
}
// Checkpoint progress periodically
if (results.length % 10 === 0) {
this.dr.checkpoint("batch-progress", { results, remaining: tasks.length - results.length });
}
}
// Final checkpoint
this.dr.checkpoint("batch-complete", { results });
return results;
}
async recover() {
// Check for incomplete batch
const progress = this.dr.restore("batch-progress");
const pending = this.dr.restore("batch-pending");
if (progress) {
const completedIds = new Set(progress.results.map((r) => r.taskId));
const remaining = pending?.tasks.filter((t) => !completedIds.has(t.id));
console.log(
`Recovering: ${progress.results.length} done, ${remaining?.length || 0} remaining`
);
return remaining || [];
}
if (pending) {
console.log(`Recovering full batch: ${pending.tasks.length} tasks`);
return pending.tasks;
}
return [];
}
async solveSingle(task) {
const resp = await axios.post("https://ocr.captchaai.com/in.php", null, {
params: {
key: API_KEY,
method: "userrecaptcha",
googlekey: task.sitekey,
pageurl: task.pageurl,
json: 1,
},
});
if (resp.data.status !== 1) throw new Error(resp.data.request);
const captchaId = resp.data.request;
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 5000));
const poll = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (poll.data.status === 1) return { solution: poll.data.request };
if (poll.data.request !== "CAPCHA_NOT_READY")
throw new Error(poll.data.request);
}
throw new Error("TIMEOUT");
}
}
// Start with recovery check
const solver = new ResilientSolver();
solver.recover().then((remaining) => {
if (remaining.length > 0) {
console.log(`Resuming ${remaining.length} tasks from checkpoint`);
solver.solveBatch(remaining);
}
});
DR Runbook Template
RUNBOOK: CAPTCHA Pipeline Recovery
====================================
1. DETECT
- Alert fires: [PagerDuty / Slack / Email]
- Symptom: [Queue growing / Workers offline / Error spike]
2. ASSESS
- Check worker health: curl http://workers/health
- Check API status: GET /res.php?action=getbalance
- Check queue depth: SELECT COUNT(*) FROM tasks WHERE status='pending'
3. RECOVER
If: Workers crashed
→ Restart worker containers: docker-compose up -d workers
→ Run stale task recovery: recovery.py --recover-stale
If: Network partition
→ Failover to secondary region
→ Update DNS or load balancer routing
If: API key compromised
→ Generate new key at captchaai.com
→ Update secret store
→ Rolling restart workers
4. VERIFY
- Confirm solve rate > 90%
- Confirm queue draining
- Confirm no duplicate solves
5. POST-MORTEM
- Document root cause
- Update runbook if needed
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Tasks lost during crash | In-memory queue only | Use persistent queue (SQLite, Redis with AOF) |
| Duplicate solves after recovery | Stale tasks reprocessed without dedup | Add idempotency keys; check if already solved |
| Recovery takes > RTO | Database backup too old | Increase checkpoint frequency |
| Failover to wrong region | DNS TTL too high | Reduce TTL to 60s before planned failovers |
FAQ
How often should I checkpoint?
Every 5–10 completed tasks, or every 30 seconds — whichever comes first. More frequent checkpoints reduce RPO but add I/O overhead.
Should I use SQLite or Redis for task persistence?
SQLite for single-node setups (simpler, no extra infrastructure). Redis with AOF persistence for distributed systems (faster, built-in pub/sub).
What if CaptchaAI itself has an outage?
Queue tasks locally and retry when the API recovers. CaptchaAI has high uptime, but your pipeline should handle temporary unavailability gracefully with circuit breakers and retry logic.
Next Steps
Plan for the worst — get your CaptchaAI API key and build disaster recovery into your pipeline from day one.
Related guides:
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.