A scraping pipeline starts simple: detect CAPTCHA, solve it, inject the token. Then you need logging. Then proxy rotation before solving. Then token caching after solving. Each feature tangles with the core loop. A plugin architecture lets you add behaviour at defined hook points without modifying the pipeline itself.
Pipeline Lifecycle Hooks
A CAPTCHA solve goes through four phases — plugins attach to any of them:
- before_submit — Modify parameters, select proxy, log the request
- after_submit — Record task ID, start timing
- before_result — Check cache before polling
- after_result — Cache token, log timing, validate response
Python: Hook-Based Pipeline
import requests
import time
from dataclasses import dataclass, field
from typing import Callable
API_KEY = "YOUR_API_KEY"
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"
@dataclass
class SolveContext:
"""Shared state flowing through the pipeline."""
params: dict
task_id: str | None = None
result: str | None = None
metadata: dict = field(default_factory=dict)
skip_poll: bool = False
class CaptchaPipeline:
"""Plugin-based CAPTCHA solving pipeline."""
def __init__(self, api_key: str):
self.api_key = api_key
self._hooks: dict[str, list[Callable]] = {
"before_submit": [],
"after_submit": [],
"before_result": [],
"after_result": [],
}
def register(self, hook: str, handler: Callable):
"""Register a handler for a lifecycle hook."""
if hook not in self._hooks:
raise ValueError(f"Unknown hook: {hook}. Valid: {list(self._hooks)}")
self._hooks[hook].append(handler)
def plugin(self, hook: str):
"""Decorator to register a plugin."""
def decorator(fn):
self.register(hook, fn)
return fn
return decorator
def _run_hooks(self, hook: str, ctx: SolveContext):
for handler in self._hooks[hook]:
handler(ctx)
def solve(self, params: dict, timeout: int = 180) -> str:
ctx = SolveContext(params=params)
# Phase 1: before_submit
self._run_hooks("before_submit", ctx)
# Phase 2: submit
submit_params = {**ctx.params, "key": self.api_key, "json": 1}
resp = requests.post(SUBMIT_URL, data=submit_params, timeout=30).json()
if resp.get("status") != 1:
raise RuntimeError(f"Submit failed: {resp.get('request')}")
ctx.task_id = resp["request"]
self._run_hooks("after_submit", ctx)
# Phase 3: before_result — plugin may set skip_poll with cached result
self._run_hooks("before_result", ctx)
if ctx.skip_poll and ctx.result:
return ctx.result
# Phase 4: poll
start = time.monotonic()
while time.monotonic() - start < timeout:
time.sleep(5)
poll = requests.get(RESULT_URL, params={
"key": self.api_key, "action": "get",
"id": ctx.task_id, "json": 1,
}, timeout=15).json()
if poll.get("request") == "CAPCHA_NOT_READY":
continue
if poll.get("status") == 1:
ctx.result = poll["request"]
self._run_hooks("after_result", ctx)
return ctx.result
raise RuntimeError(f"Solve failed: {poll.get('request')}")
raise RuntimeError("Timeout")
# --- Plugins ---
pipeline = CaptchaPipeline("YOUR_API_KEY")
@pipeline.plugin("before_submit")
def logging_plugin(ctx: SolveContext):
"""Log every solve request."""
method = ctx.params.get("method", "unknown")
print(f"[LOG] Solving {method} CAPTCHA")
ctx.metadata["start_time"] = time.monotonic()
@pipeline.plugin("after_result")
def timing_plugin(ctx: SolveContext):
"""Record solve duration."""
start = ctx.metadata.get("start_time")
if start:
duration = time.monotonic() - start
ctx.metadata["duration_s"] = round(duration, 2)
print(f"[LOG] Solved in {duration:.1f}s")
@pipeline.plugin("before_submit")
def proxy_plugin(ctx: SolveContext):
"""Attach a proxy to every request."""
ctx.params.setdefault("proxy", "http://user:pass@proxy.example.com:8080")
ctx.params.setdefault("proxytype", "HTTP")
# Token cache plugin
_cache: dict[str, tuple[str, float]] = {} # key -> (token, expiry)
CACHE_TTL = 90 # seconds
@pipeline.plugin("before_result")
def cache_check_plugin(ctx: SolveContext):
"""Return cached token if still valid."""
cache_key = f"{ctx.params.get('method')}:{ctx.params.get('pageurl')}"
if cache_key in _cache:
token, expiry = _cache[cache_key]
if time.monotonic() < expiry:
ctx.result = token
ctx.skip_poll = True
print("[CACHE] Returning cached token")
@pipeline.plugin("after_result")
def cache_store_plugin(ctx: SolveContext):
"""Cache solved tokens."""
cache_key = f"{ctx.params.get('method')}:{ctx.params.get('pageurl')}"
_cache[cache_key] = (ctx.result, time.monotonic() + CACHE_TTL)
# --- Usage ---
token = pipeline.solve({
"method": "turnstile",
"sitekey": "0x4XXXXXXXXXXXXXXXXX",
"pageurl": "https://example.com/login",
})
print(f"Token: {token[:30]}...")
JavaScript: Event-Based Pipeline
const API_KEY = "YOUR_API_KEY";
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";
class CaptchaPipeline {
#hooks = { beforeSubmit: [], afterSubmit: [], beforeResult: [], afterResult: [] };
#apiKey;
constructor(apiKey) {
this.#apiKey = apiKey;
}
on(hook, handler) {
if (!this.#hooks[hook]) throw new Error(`Unknown hook: ${hook}`);
this.#hooks[hook].push(handler);
return this; // chainable
}
async #runHooks(hook, ctx) {
for (const handler of this.#hooks[hook]) {
await handler(ctx);
}
}
async solve(params) {
const ctx = { params: { ...params }, taskId: null, result: null, meta: {}, skipPoll: false };
await this.#runHooks("beforeSubmit", ctx);
const body = new URLSearchParams({ key: this.#apiKey, json: "1", ...ctx.params });
const resp = await (await fetch(SUBMIT_URL, { method: "POST", body })).json();
if (resp.status !== 1) throw new Error(`Submit: ${resp.request}`);
ctx.taskId = resp.request;
await this.#runHooks("afterSubmit", ctx);
await this.#runHooks("beforeResult", ctx);
if (ctx.skipPoll && ctx.result) return ctx.result;
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 5000));
const url = `${RESULT_URL}?key=${this.#apiKey}&action=get&id=${ctx.taskId}&json=1`;
const poll = await (await fetch(url)).json();
if (poll.request === "CAPCHA_NOT_READY") continue;
if (poll.status === 1) {
ctx.result = poll.request;
await this.#runHooks("afterResult", ctx);
return ctx.result;
}
throw new Error(`Solve: ${poll.request}`);
}
throw new Error("Timeout");
}
}
// Register plugins
const pipeline = new CaptchaPipeline("YOUR_API_KEY");
pipeline
.on("beforeSubmit", (ctx) => {
ctx.meta.startTime = Date.now();
console.log(`[LOG] Solving ${ctx.params.method}`);
})
.on("afterResult", (ctx) => {
const ms = Date.now() - ctx.meta.startTime;
console.log(`[LOG] Solved in ${ms}ms`);
})
.on("beforeSubmit", (ctx) => {
ctx.params.proxy = ctx.params.proxy || "http://user:pass@proxy.example.com:8080";
ctx.params.proxytype = ctx.params.proxytype || "HTTP";
});
// Usage
const token = await pipeline.solve({
method: "turnstile",
sitekey: "0x4XXXXXXXXXXXXXXXXX",
pageurl: "https://example.com/login",
});
Plugin Ordering
Plugins run in registration order. Control execution priority by registering in the right sequence:
# Order matters — proxy must be set before the rate limiter checks
pipeline.register("before_submit", proxy_plugin) # runs first
pipeline.register("before_submit", rate_limit_plugin) # runs second
pipeline.register("before_submit", logging_plugin) # runs third
For priority-based ordering, store (priority, handler) tuples and sort before execution.
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Plugin modifies wrong field | Context field name mismatch | Use SolveContext dataclass fields consistently |
skip_poll set but no result |
Cache plugin sets flag without setting ctx.result |
Always set both skip_poll = True and ctx.result together |
| Plugin execution order wrong | Registration order determines execution | Register plugins in the order they should run |
| Async plugin blocks pipeline | Synchronous handler in async pipeline | Use async def handlers and await them |
| Plugin error crashes pipeline | No error isolation | Wrap hook execution in try/except per handler |
FAQ
How many plugins can I register per hook?
No limit. Each hook maintains an ordered list. Performance impact is negligible for dozens of plugins — the CaptchaAI API call itself is the bottleneck.
Can a plugin cancel the solve?
Yes. A before_submit plugin can raise an exception to abort. Or set a flag in ctx.metadata that downstream code checks. For cleaner cancellation, add a ctx.cancelled boolean and check it between phases.
How does this differ from middleware?
Middleware forms a chain where each layer wraps the next. Plugins hook into specific lifecycle points without wrapping. Middleware is better for request/response transformation; plugins are better for side effects at defined moments.
Next Steps
Build extensible CAPTCHA solving pipelines — get your CaptchaAI API key and register your first plugin.
Related guides:
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.