Manage workflows across multiple accounts on CAPTCHA-protected platforms — login, action, and data collection at scale.
Use Cases
| Scenario | Example |
|---|---|
| Social media management | Post/monitor across multiple brand accounts |
| E-commerce operations | Update listings on multiple seller accounts |
| Data collection | Gather data from accounts on different platforms |
| QA testing | Test user flows with different account types |
| Account monitoring | Check account status and notifications |
Architecture
┌────────────────┐ ┌──────────────┐ ┌───────────┐ ┌───────────────┐
│ Account │────▶│ Session │────▶│ CAPTCHA │────▶│ Workflow │
│ Registry │ │ Manager │ │ Solver │ │ Executor │
│ (credentials) │ │ (cookies, │ │ │ │ (per-account │
│ │ │ proxies) │ │ │ │ actions) │
└────────────────┘ └──────────────┘ └───────────┘ └───────────────┘
Implementation
Account Registry
import json
from dataclasses import dataclass, field, asdict
from typing import Optional
@dataclass
class Account:
id: str
platform: str
username: str
password: str
proxy: Optional[str] = None
cookies: dict = field(default_factory=dict)
last_login: Optional[float] = None
status: str = "active"
class AccountRegistry:
def __init__(self, filepath="accounts.json"):
self.filepath = filepath
self.accounts = {}
self._load()
def _load(self):
try:
with open(self.filepath, "r") as f:
data = json.load(f)
for item in data:
acct = Account(**item)
self.accounts[acct.id] = acct
except FileNotFoundError:
pass
def save(self):
data = [asdict(a) for a in self.accounts.values()]
with open(self.filepath, "w") as f:
json.dump(data, f, indent=2)
def add(self, account):
self.accounts[account.id] = account
self.save()
def get(self, account_id):
return self.accounts.get(account_id)
def get_by_platform(self, platform):
return [a for a in self.accounts.values() if a.platform == platform and a.status == "active"]
def update_status(self, account_id, status):
if account_id in self.accounts:
self.accounts[account_id].status = status
self.save()
Session Manager
import time
import requests
import pickle
import os
class SessionManager:
def __init__(self, sessions_dir="sessions"):
self.sessions_dir = sessions_dir
os.makedirs(sessions_dir, exist_ok=True)
self.sessions = {}
def get_session(self, account):
"""Get or create a requests session for an account."""
if account.id in self.sessions:
return self.sessions[account.id]
session = requests.Session()
# Set proxy if configured
if account.proxy:
session.proxies = {
"http": account.proxy,
"https": account.proxy,
}
# Load saved cookies
cookie_file = os.path.join(self.sessions_dir, f"{account.id}.cookies")
if os.path.exists(cookie_file):
with open(cookie_file, "rb") as f:
session.cookies = pickle.load(f)
self.sessions[account.id] = session
return session
def save_session(self, account):
"""Persist session cookies."""
if account.id in self.sessions:
cookie_file = os.path.join(self.sessions_dir, f"{account.id}.cookies")
with open(cookie_file, "wb") as f:
pickle.dump(self.sessions[account.id].cookies, f)
def clear_session(self, account_id):
if account_id in self.sessions:
del self.sessions[account_id]
cookie_file = os.path.join(self.sessions_dir, f"{account_id}.cookies")
if os.path.exists(cookie_file):
os.remove(cookie_file)
CAPTCHA-Aware Login
import time
import requests
class CaptchaSolver:
BASE = "https://ocr.captchaai.com"
def __init__(self, api_key):
self.api_key = api_key
def solve(self, params, initial_wait=10):
params["key"] = self.api_key
params["json"] = 1
resp = requests.post(f"{self.BASE}/in.php", data=params).json()
if resp["status"] != 1:
raise Exception(resp["request"])
task_id = resp["request"]
time.sleep(initial_wait)
for _ in range(60):
result = requests.get(
f"{self.BASE}/res.php",
params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
).json()
if result["request"] == "CAPCHA_NOT_READY":
time.sleep(5)
continue
if result["status"] == 1:
return result["request"]
raise Exception(result["request"])
raise TimeoutError("Timed out")
class LoginHandler:
def __init__(self, captcha_solver):
self.solver = captcha_solver
def login(self, session, account, login_config):
"""
login_config: {
"url": login page URL,
"submit_url": login form action URL,
"captcha_type": "recaptcha_v2" | "turnstile" | None,
"sitekey": "...",
"username_field": "username",
"password_field": "password",
"captcha_field": "g-recaptcha-response",
}
"""
# Get login page (for CSRF token / cookies)
session.get(login_config["url"])
payload = {
login_config.get("username_field", "username"): account.username,
login_config.get("password_field", "password"): account.password,
}
# Solve CAPTCHA if present
captcha_type = login_config.get("captcha_type")
if captcha_type:
if captcha_type == "recaptcha_v2":
token = self.solver.solve({
"method": "userrecaptcha",
"googlekey": login_config["sitekey"],
"pageurl": login_config["url"],
})
elif captcha_type == "turnstile":
token = self.solver.solve({
"method": "turnstile",
"sitekey": login_config["sitekey"],
"pageurl": login_config["url"],
})
else:
raise ValueError(f"Unknown captcha type: {captcha_type}")
captcha_field = login_config.get("captcha_field", "g-recaptcha-response")
payload[captcha_field] = token
submit_url = login_config.get("submit_url", login_config["url"])
resp = session.post(submit_url, data=payload, allow_redirects=True)
# Check login success
success = resp.status_code == 200 and "login" not in resp.url.lower()
if success:
account.last_login = time.time()
return success
Workflow Executor
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
logger = logging.getLogger("workflow")
class WorkflowExecutor:
def __init__(self, api_key, max_workers=5):
self.solver = CaptchaSolver(api_key)
self.registry = AccountRegistry()
self.sessions = SessionManager()
self.login_handler = LoginHandler(self.solver)
self.max_workers = max_workers
def run_for_account(self, account, workflow_fn, login_config):
"""Execute a workflow for a single account."""
session = self.sessions.get_session(account)
# Login if needed
if not account.last_login or time.time() - account.last_login > 3600:
logger.info(f"Logging in: {account.id}")
if not self.login_handler.login(session, account, login_config):
logger.error(f"Login failed: {account.id}")
self.registry.update_status(account.id, "login_failed")
return {"account": account.id, "status": "login_failed"}
self.sessions.save_session(account)
self.registry.save()
# Execute workflow
try:
result = workflow_fn(session, account)
return {"account": account.id, "status": "success", "data": result}
except Exception as e:
logger.error(f"Workflow failed for {account.id}: {e}")
return {"account": account.id, "status": "error", "error": str(e)}
def run_for_all(self, platform, workflow_fn, login_config):
"""Execute a workflow across all accounts on a platform."""
accounts = self.registry.get_by_platform(platform)
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
futures = {
pool.submit(self.run_for_account, acct, workflow_fn, login_config): acct
for acct in accounts
}
for future in as_completed(futures):
result = future.result()
results.append(result)
logger.info(f" {result['account']}: {result['status']}")
return results
Usage Example
executor = WorkflowExecutor("YOUR_API_KEY", max_workers=3)
# Define platform login config
login_config = {
"url": "https://platform.example.com/login",
"submit_url": "https://platform.example.com/api/login",
"captcha_type": "recaptcha_v2",
"sitekey": "6Le-wvkSAAAA...",
"username_field": "email",
"password_field": "password",
"captcha_field": "g-recaptcha-response",
}
# Define workflow
def check_notifications(session, account):
resp = session.get("https://platform.example.com/api/notifications")
data = resp.json()
return {
"unread": data.get("unread_count", 0),
"latest": data.get("notifications", [])[:5],
}
# Run across all accounts
results = executor.run_for_all("example_platform", check_notifications, login_config)
for r in results:
if r["status"] == "success":
print(f"{r['account']}: {r['data']['unread']} unread notifications")
else:
print(f"{r['account']}: {r['status']}")
Workflow Examples
Data Export
def export_data(session, account):
resp = session.get("https://platform.example.com/api/export")
filename = f"export_{account.id}.json"
with open(filename, "w") as f:
f.write(resp.text)
return {"file": filename, "size": len(resp.text)}
Status Check
def check_status(session, account):
resp = session.get("https://platform.example.com/api/account/status")
return resp.json()
Update Settings
def update_settings(session, account):
resp = session.post(
"https://platform.example.com/api/settings",
json={"timezone": "UTC", "notifications": True},
)
return {"updated": resp.status_code == 200}
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| All logins failing | Sitekey changed | Re-extract sitekey from login page |
| Session cookies invalid | Expired cookies | Clear session and re-login |
| Rate limited | Too many concurrent logins | Reduce max_workers, add delays |
| Account locked | Too many failed attempts | Check credentials, reduce frequency |
FAQ
How many accounts can I manage concurrently?
Start with 3-5 concurrent workers and increase based on the platform's rate limits. Monitor for rate-limiting responses (429).
How do I handle account-specific proxies?
Set the proxy field on each Account object. The SessionManager automatically applies the proxy for that account's requests.
How do I rotate credentials securely?
Store credentials in environment variables or a secrets manager. Never hardcode credentials in source files.
Related Guides
Scale multi-step workflows — solve CAPTCHAs with CaptchaAI.
Discussions (0)
Join the conversation
Sign in to share your opinion.
Sign InNo comments yet.