Documentation Index
Fetch the complete documentation index at: https://mintlify.com/aryamantodkar/oneglanse/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The agent worker is a BullMQ-based job processor that runs headless Chromium browsers via Playwright to execute user prompts across AI providers (ChatGPT, Claude, Gemini, Perplexity, Google AI Overview). It extracts responses and sources, stores them in ClickHouse, and triggers analysis.
Tech Stack
- Queue: BullMQ (Redis-backed)
- Browser Automation: Playwright (Chromium)
- Proxy Management: Custom rotating proxy pool with scoring
- Stealth: Custom CDP-based stealth configuration
- Language: TypeScript with ES modules
Architecture
Job Flow
┌─────────────┐
│ Web App │ submits job group
└──────┬──────┘
│
▼
┌─────────────────────────────────────────┐
│ Redis (Job Queue + Progress Tracking) │
└──────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Worker: One BullMQ worker per provider │
│ - chatgpt-worker │
│ - claude-worker │
│ - gemini-worker │
│ - perplexity-worker │
│ - ai-overview-worker │
└──────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Agent Handler │
│ 1. Fetch proxies from pool │
│ 2. Launch CDP browser (warm or cold) │
│ 3. Navigate to provider │
│ 4. Run prompts sequentially │
│ 5. Extract responses + sources │
└──────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Store Results │
│ - ClickHouse: prompt_responses │
│ - Redis: Update progress │
└──────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Trigger Analysis (Background) │
│ - Analyzes brand mentions │
│ - Stores in prompt_analysis table │
└─────────────────────────────────────────┘
Worker Implementation
Worker Initialization
Each provider gets its own BullMQ worker:
import { Worker } from "bullmq";
import { PROVIDER_LIST } from "@oneglanse/types";
import { getQueueName } from "@oneglanse/services";
import { handleJob } from "./worker/jobHandler.js";
export let workers: Worker[] = [];
async function startWorkers() {
await waitForRedis();
const workerConcurrency = env.AGENT_WORKER_CONCURRENCY || 1;
const connection = {
host: env.REDIS_HOST,
port: env.REDIS_PORT,
password: env.REDIS_PASSWORD,
};
workers = PROVIDER_LIST.map((provider) => {
const w = new Worker(getQueueName(provider), handleJob, {
connection,
concurrency: workerConcurrency,
lockDuration: 15 * 60 * 1000, // 15 minutes
stalledInterval: 60 * 1000,
maxStalledCount: 5,
});
w.on("active", (job) => plog.log("Job started", job.id));
w.on("completed", (job) => plog.success("Job completed", job.id));
w.on("failed", (job, err) => plog.error("Job failed", job?.id, err));
return w;
});
}
Reference: apps/agent/src/worker.ts:12-55
Job Handler
The job handler processes jobs for a single provider:
apps/agent/src/worker/jobHandler.ts
export async function handleJob(job: Job<ProviderJobData>): Promise<boolean> {
const { provider, jobGroupId, prompts, user_id, workspace_id } = job.data;
// Generate execution timestamp
const executionTime = new Date().toISOString();
const promptPayload: PromptPayload = {
user_id,
workspace_id,
prompts: prompts.map(({ id, prompt }) => ({ id, prompt })),
created_at: executionTime,
};
// Initialize progress tracking in Redis
const progressKey = `job:${jobGroupId}:result`;
await updateProgress(progressKey, provider, "running", null);
let wrapped: AgentResult = { status: "rejected", data: [] };
// Run the agent
try {
const { label, factory } = providerConfig[provider];
const result = await agentHandler(
label,
factory,
promptPayload,
provider
);
wrapped = {
status: result.length > 0 ? "fulfilled" : "rejected",
data: result,
};
} catch (err) {
plog.error(`failed:`, toErrorMessage(err));
}
// Store successful results
if (wrapped.status === "fulfilled" && wrapped.data.length > 0) {
await storePromptResponses({
results: { [provider]: wrapped },
userId: user_id,
workspaceId: workspace_id,
promptRunAt: executionTime,
});
// Trigger analysis asynchronously
runAnalysisInBackground({
workspaceId: workspace_id,
userId: user_id,
provider,
jobGroupId,
});
}
// Update final progress
const finalStatus = wrapped.status === "fulfilled" ? "completed" : "failed";
await updateProgress(progressKey, provider, finalStatus, wrapped.data.length);
return true;
}
Reference: apps/agent/src/worker/jobHandler.ts:81-188
Playwright Browser Automation
CDP Browser Launch
The worker uses Chrome DevTools Protocol (CDP) for better control:
apps/agent/src/lib/browser/launch.ts
export async function launchContext(
provider: Provider,
): Promise<{
browser: Browser;
context: BrowserContext;
proxy: string | null;
cleanup: () => Promise<void>;
}> {
await cleanupStaleCdpDirs();
// Get proxy from pool
let proxy = getNextProxy();
if (!proxy) {
await fetchProxies({ forceRefresh: true });
proxy = getNextProxy();
}
// Spawn Chrome with CDP
const port = await getFreePort();
const userDataDir = `/tmp/cdp-${provider}-${port}`;
const chromeProcess = spawnChromiumCDP(port, userDataDir);
const wsEndpoint = await waitForCDPEndpoint(port);
const browser = await chromium.connectOverCDP(wsEndpoint);
// Create stealth context
const context = await browser.newContext({
viewport: { width: 1920, height: 1080 },
...(proxy ? { proxy: { server: proxy } } : {}),
...STEALTH_CONTEXT_OPTIONS,
});
await context.addInitScript(STEALTH_INIT_SCRIPT);
return { browser, context, proxy, cleanup };
}
Reference: apps/agent/src/lib/browser/launch.ts:44-134
Warm Browser Pool
To avoid repeated browser spawns, the agent maintains a warm pool:
apps/agent/src/core/agentHandler.ts
const warmFactory: AgentFactory = async () => {
const warm = await getWarmBrowser(provider).catch(() => null);
if (warm) {
const config = PROVIDER_CONFIGS[provider];
try {
// Reuse existing browser, just navigate to clean slate
await navigateWithRetry(warm.page, config.url, {
waitUntil: "domcontentloaded",
timeout: 30_000,
});
if (config.postNavigationHook) {
await config.postNavigationHook(warm.page);
}
return {
browser: warm.browser,
context: warm.context,
page: warm.page,
proxy: warm.proxy ?? undefined,
cleanup: warm.cleanup ?? undefined,
};
} catch {
// Navigation failed, close and fall back to cold factory
await warm.cleanup?.().catch(() => {});
}
}
return agentFactory(); // Cold start
};
Reference: apps/agent/src/core/agentHandler.ts:31-58
Stealth Configuration
The agent uses stealth techniques to avoid detection:
export const STEALTH_CONTEXT_OPTIONS = {
userAgent: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36...",
locale: "en-US",
timezoneId: "America/New_York",
permissions: ["geolocation"],
geolocation: { latitude: 40.7128, longitude: -74.0060 },
colorScheme: "light" as const,
};
export const STEALTH_INIT_SCRIPT = `
// Override navigator.webdriver
Object.defineProperty(navigator, 'webdriver', { get: () => false });
// Override chrome runtime
window.chrome = { runtime: {} };
// Override permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
`;
Provider Integrations
Provider Configuration
Each provider has a dedicated configuration:
apps/agent/src/core/providers/index.ts
export const PROVIDER_CONFIGS: Record<Provider, ProviderConfig> = {
gemini: geminiConfig,
chatgpt: chatgptConfig,
perplexity: perplexityConfig,
claude: claudeConfig,
"ai-overview": aiOverviewConfig,
};
Reference: apps/agent/src/core/providers/index.ts:19-25
Provider Config Structure
Each provider config defines:
export interface ProviderConfig {
label: string; // Display name
url: string; // Provider URL
requiresWarmup: boolean; // Whether to warm up editor
skip?: boolean; // Skip this provider
postNavigationHook?: (page: Page) => Promise<void>;
// Steps
askPrompt: (page: Page, prompt: string) => Promise<void>;
extractResponse: (page: Page) => Promise<string>;
extractSources: (page: Page, response: string) => Promise<Source[]>;
}
Example: ChatGPT Provider
apps/agent/src/core/providers/chatgpt/index.ts
export const chatgptConfig: ProviderConfig = {
label: "ChatGPT",
url: "https://chatgpt.com",
requiresWarmup: true,
askPrompt: async (page, prompt) => {
const editor = await findEditor(page);
await editor.fill(prompt);
const sendButton = await findSendButton(page);
await sendButton.click();
await waitForFinish(page);
},
extractResponse: async (page) => {
const responseElement = await findResponseElement(page);
return toMarkdown(await responseElement.innerHTML());
},
extractSources: extractChatGPTSources,
};
Proxy Management
Proxy Pool
The agent maintains a scored proxy pool:
apps/agent/src/lib/browser/proxy/pool.ts
export function getNextProxy(): string | null {
if (proxyRecords.length === 0) return null;
// Sort by score (descending), then by last used time (ascending)
proxyRecords.sort((a, b) => {
if (b.score !== a.score) return b.score - a.score;
return a.lastUsedAt - b.lastUsedAt;
});
const record = proxyRecords[0];
record.lastUsedAt = Date.now();
return record.proxy;
}
export function recordProxyResult(
proxy: string,
success: boolean,
reason?: string,
provider?: Provider,
): void {
const record = proxyRecords.find((r) => r.proxy === proxy);
if (!record) return;
if (success) {
record.score = Math.min(100, record.score + 5);
record.consecutiveFailures = 0;
} else {
record.score = Math.max(0, record.score - 10);
record.consecutiveFailures++;
if (record.consecutiveFailures >= 3) {
// Remove proxy from pool
proxyRecords = proxyRecords.filter((r) => r.proxy !== proxy);
}
}
}
Proxy Retry Logic
The agent retries with different proxies on failure:
export async function runWithProxyPool(
label: string,
factory: AgentFactory,
payload: PromptPayload,
provider: Provider,
refreshProxies: () => Promise<void>,
): Promise<AskPromptResult[]> {
const maxAttempts = 3;
let lastError: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
const { browser, context, page, proxy, cleanup } = await factory();
try {
const results = await runAgents(payload, page, provider);
if (proxy) {
recordProxyResult(proxy, true, undefined, provider);
}
return results;
} finally {
await cleanup?.();
}
} catch (err) {
lastError = err;
if (attempt < maxAttempts) {
await refreshProxies();
}
}
}
throw lastError;
}
Job Processing Flow
1. Submit Job Group
The web app submits a job group with one job per enabled provider:
packages/services/src/agent/jobs.ts
export async function submitAgentJobGroup(args: {
workspaceId: string;
userId: string;
}): Promise<SubmitAgentJobResult> {
const { workspaceId, userId } = args;
const prompts = await fetchUserPromptsForWorkspace({ workspaceId });
if (!prompts || prompts.length === 0) {
return { status: "empty" };
}
const jobGroupId = randomUUID();
const workspace = await getWorkspaceById({ workspaceId });
const enabledProviders = JSON.parse(workspace.enabledProviders) as Provider[];
// Initialize progress in Redis
const progress = {
status: "pending",
updateId: 0,
providers: Object.fromEntries(enabledProviders.map(p => [p, "pending"])),
results: Object.fromEntries(enabledProviders.map(p => [p, 0])),
stats: {
totalPrompts: prompts.length,
expectedResponses: prompts.length * enabledProviders.length,
actualResponses: 0,
},
};
await redis.set(
`job:${jobGroupId}:result`,
JSON.stringify(progress),
"EX",
60 * 60
);
// Submit one job per provider
await Promise.all(
enabledProviders.map((provider) =>
getProviderQueue(provider).add("run-agent", {
jobGroupId,
provider,
prompts,
user_id: userId,
workspace_id: workspaceId,
})
)
);
return { status: "queued", jobGroupId };
}
Reference: packages/services/src/agent/jobs.ts:18-72
2. Process Job
The worker picks up the job and runs the agent:
1. Update Redis progress to "running"
2. Launch CDP browser with proxy
3. Navigate to provider URL
4. For each prompt:
a. Input prompt into editor
b. Wait for response to complete
c. Extract response text (markdown)
d. Extract sources (title, URL, citation)
5. Store results in ClickHouse
6. Update Redis progress to "completed"
7. Trigger analysis in background
3. Store Results
Results are stored in ClickHouse:
await clickhouse.insert({
table: "analytics.prompt_responses",
values: responses.map(r => ({
id: randomUUID(),
prompt_id: r.promptId,
prompt: r.prompt,
user_id: userId,
workspace_id: workspaceId,
model: r.model,
model_provider: provider,
response: r.response,
sources: r.sources,
is_analysed: false,
prompt_run_at: executionTime,
created_at: new Date(),
})),
format: "JSONEachRow",
});
Adding New Providers
To add a new AI provider:
1. Create Provider Config
Create apps/agent/src/core/providers/myprovider/index.ts:
import type { ProviderConfig } from "../types.js";
import { extractMyProviderSources } from "./lib/extractSources.js";
export const myProviderConfig: ProviderConfig = {
label: "MyProvider",
url: "https://myprovider.com/chat",
requiresWarmup: false,
askPrompt: async (page, prompt) => {
// Find input element
const input = await page.locator('[data-testid="chat-input"]');
await input.fill(prompt);
// Submit
const sendBtn = await page.locator('button[type="submit"]');
await sendBtn.click();
// Wait for response
await page.waitForSelector('.response-complete');
},
extractResponse: async (page) => {
const responseEl = await page.locator('.response-content').last();
const html = await responseEl.innerHTML();
return toMarkdown(html);
},
extractSources: extractMyProviderSources,
};
2. Register Provider
Add to apps/agent/src/core/providers/index.ts:
import { myProviderConfig } from "./myprovider/index.js";
export const PROVIDER_CONFIGS: Record<Provider, ProviderConfig> = {
// ... existing providers
myprovider: myProviderConfig,
};
3. Add to Type Definitions
Update packages/types/src/provider.ts:
export const PROVIDER_LIST = [
"chatgpt",
"claude",
"gemini",
"perplexity",
"ai-overview",
"myprovider", // Add here
] as const;
export type Provider = (typeof PROVIDER_LIST)[number];
4. Update Database
Add the provider to default enabled providers in the schema:
packages/db/src/schema/workspace.ts
const DEFAULT_PROVIDERS_JSON =
'["chatgpt","claude","perplexity","gemini","ai-overview","myprovider"]';
Environment Variables
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
# Worker concurrency
AGENT_WORKER_CONCURRENCY=1
# ClickHouse
CLICKHOUSE_URL=http://localhost:8123
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=
CLICKHOUSE_DB=analytics
# Proxy (optional)
PROXY_API_URL=https://proxy-api.example.com
Development Commands
# Run worker locally
pnpm dev
# Build worker
pnpm build
# Type checking
pnpm typecheck
Graceful Shutdown
The worker handles SIGTERM/SIGINT gracefully:
const shutdown = async (signal: string) => {
logger.log(`Received ${signal}. Starting graceful shutdown...`);
// 1. Close warm browser pool
await closeAllWarm();
// 2. Close workers (wait for current jobs to finish)
await Promise.all(workers.map((w) => w.close()));
// 3. Close Redis connection
await redis.quit();
process.exit(0);
};
process.on("SIGTERM", () => void shutdown("SIGTERM"));
process.on("SIGINT", () => void shutdown("SIGINT"));
Reference: apps/agent/src/index.ts:7-48