Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/theonetrade/backtest-ollama-crontab/llms.txt

Use this file to discover all available pages before exploring further.

SignalJobService is the bridge between the crawler ingestion layer and the LLM risk-screening layer. It listens on the signalJobSubject observable subject, and each time the subject fires it runs a queued job that reads unprocessed rows from parser-items, wraps each row in a correctly-timed execution context, invokes SignalLogicService.execute to obtain a RiskOutline assessment, writes the result to screen-items, and marks the source row as visited. The queued() wrapper from functools-kit ensures that overlapping crontab ticks do not spawn concurrent job runs — any trigger that arrives while a run is already in progress simply waits in line.

signalJobSubject emitter

Defined in config/emitters.ts. An instance of Subject<void> from functools-kit.
import { Subject } from "functools-kit";

export const signalJobSubject = new Subject<void>();
Calling signalJobSubject.next() fires all active subscriptions synchronously. CrawlerMainService calls it at the end of every crawl (crawlLiveFrame and crawlBacktestFrame), ensuring that new parser-items rows are processed immediately after ingestion.
Subject<void> carries no payload. The subscriber (SignalJobService.run) determines what work to do by inspecting the current execution mode and database state at the moment of invocation.

SignalJobService

SignalJobService is registered under TYPES.signalJobService and is accessible as core.signalJobService.
import { SignalJobService } from "@core/lib/services/job/SignalJobService";
// const signalJobService = inject<SignalJobService>(TYPES.signalJobService);

enable(): () => void

Subscribes to signalJobSubject and starts the queued job runner. Protected by singleshot() from functools-kit so the subscription is created at most once even if enable() is called multiple times.
public enable = singleshot(() => {
  const unJob    = signalJobSubject.subscribe(this.run);
  const unEnable = () => this.enable.clear();
  return compose(unJob, unEnable);
});
Returns a dispose function produced by compose(unJob, unEnable). Calling the returned function:
  1. Unsubscribes from signalJobSubject
  2. Clears the singleshot sentinel so enable() can be called again
Usage pattern:
// Application bootstrap
const dispose = signalJobService.enable();

// Later — e.g. test teardown or graceful shutdown
dispose();

disable(): void

Convenience method that calls the dispose function returned by enable() if it has previously been invoked.
public disable = () => {
  if (this.enable.hasValue()) {
    const lastSubscription = this.enable();
    lastSubscription();
  }
};
this.enable.hasValue() returns true only while the singleshot memo holds a value (i.e. after enable() has been called and before the returned dispose has been invoked). If enable() was never called, disable() is a no-op.
Typical lifecycle:
// Test setup
signalJobService.enable();

// ... run test scenarios ...

// Test teardown — stop listening and release resources
signalJobService.disable();

Internal Processing Pipeline

run() — queued entry point

private run = queued(async () => {
  const mode = await getMode();
  if (mode === "backtest") {
    const { frameName } = await getContext();
    await RUN_BACKTEST_FN(this, frameName);
    return;
  }
  if (mode === "live") {
    await RUN_LIVE_FN(this);
    return;
  }
});
The queued() wrapper from functools-kit serialises concurrent invocations: if a run is in progress when signalJobSubject.next() fires, the new call is enqueued and executed after the current one resolves. This prevents duplicate LLM calls for the same row when crontab ticks overlap.

RUN_LIVE_FN — live mode processing

const RUN_LIVE_FN = async (self: SignalJobService) => {
  const rowList = await self.parserDbService.findAllByVisited(false);
  for (const row of rowList) {
    if (await self.screenDbService.findByParserItem(row.id)) {
      continue;                              // already has a screen-item
    }
    const dto = await RUN_IN_CONTEXT_FN(self, row, false);
    await self.screenDbService.create(dto);
    await self.parserDbService.markVisited(row.id);
  }
};
1

Fetch unvisited rows

Calls parserDbService.findAllByVisited(false) to retrieve all parser-items documents where visited: false.
2

Dedup check

For each row, checks screenDbService.findByParserItem(row.id). If a screen-items document already exists for this parserItemId, the row is skipped to prevent duplicate LLM invocations.
3

Run LLM in context

Calls RUN_IN_CONTEXT_FN(self, row, backtest=false) to obtain an IScreenDto from SignalLogicService.execute.
4

Persist and mark

Writes the IScreenDto to screen-items via screenDbService.create(), then sets visited: true on the source parser-items row via parserDbService.markVisited(row.id).

RUN_BACKTEST_FN — backtest mode processing

const RUN_BACKTEST_FN = async (self: SignalJobService, frameName: string) => {
  const frameList = await listFrameSchema();
  const { startDate, endDate } = frameList.find(
    (frame) => frame.frameName === frameName
  );
  const rowList = await self.parserDbService.findAllByPublishedAt(startDate, endDate);
  for (const row of rowList) {
    if (await self.screenDbService.findByParserItem(row.id)) {
      continue;
    }
    const dto = await RUN_IN_CONTEXT_FN(self, row, true);
    await self.screenDbService.create(dto);
    await self.parserDbService.markVisited(row.id);
  }
};
1

Resolve frame boundaries

Calls listFrameSchema() from backtest-kit and finds the frame whose frameName matches the current context. Extracts startDate and endDate.
2

Fetch rows in date range

Calls parserDbService.findAllByPublishedAt(startDate, endDate) to get all parser-items within the frame window.
3

Dedup check

Same guard as live mode — skips rows that already have a screen-items counterpart.
4

Run LLM in historical context

Calls RUN_IN_CONTEXT_FN(self, row, backtest=true). The backtest flag is forwarded to ExecutionContextService.runInContext so the LLM agent sees the correct historical market context rather than the current moment.
5

Persist and mark

Identical to live mode — writes to screen-items and marks the parser-items row visited.

RUN_IN_CONTEXT_FN — execution context wrapper

const RUN_IN_CONTEXT_FN = beginTime(
  async (self: SignalJobService, row: IParserRow, backtest: boolean) => {
    const when = alignToInterval(row.publishedAt, "1m");
    return await ExecutionContextService.runInContext(
      async () => {
        return await self.signalLogicService.execute(row);
      },
      {
        symbol:   row.symbol,
        when,
        backtest,
      },
    );
  },
);
This is the most important piece of the job pipeline from a correctness standpoint. It uses two primitives from backtest-kit:
  • alignToInterval(row.publishedAt, "1m") — snaps the signal’s publication timestamp to the nearest 1-minute candle boundary. This is the when value the LLM context will see as “now”.
  • ExecutionContextService.runInContext(fn, { symbol, when, backtest }) — establishes a scoped execution context so the agent-swarm-kit LLM call inside SignalLogicService.execute queries market data at the correct historical moment rather than wall-clock time.
  • beginTime(...) — wraps the entire function, recording the start timestamp for performance and audit logging.
Without ExecutionContextService.runInContext, the LLM would evaluate market conditions at the current wall-clock time. For backtest runs this would produce anachronistic results — always wrap LLM calls with RUN_IN_CONTEXT_FN rather than calling signalLogicService.execute directly.

enable / disable Pattern

import { inject } from "@core/lib/core/di";
import TYPES from "@core/lib/core/types";
import SignalJobService from "@core/lib/services/job/SignalJobService";

const signalJobService = inject<SignalJobService>(TYPES.signalJobService);

// --- Application startup ---
const disposeJob = signalJobService.enable();

// --- Application shutdown / test teardown ---
signalJobService.disable();
// or equivalently:
// disposeJob();

enable() returns dispose

The returned function both unsubscribes from signalJobSubject and resets the singleshot sentinel so enable() can be safely re-called after a disable().

disable() is safe to call anytime

disable() checks enable.hasValue() before acting. Calling it when enable() was never invoked is a safe no-op — useful in test teardown hooks that run unconditionally.

Data Flow Summary

signalJobSubject.next()


SignalJobService.run()  (queued — one at a time)

        ├─[backtest]─ RUN_BACKTEST_FN
        │               └─ parserDbService.findAllByPublishedAt(startDate, endDate)

        └─[live]──── RUN_LIVE_FN
                        └─ parserDbService.findAllByVisited(false)

        ▼ (both paths)
RUN_IN_CONTEXT_FN
        ├─ alignToInterval(row.publishedAt, "1m")
        └─ ExecutionContextService.runInContext(...)
                └─ signalLogicService.execute(row)
                        └─ json(OutlineName.RiskOutline, ...)


screenDbService.create(dto)
parserDbService.markVisited(row.id)

SignalLogicService

The LLM execution layer called inside RUN_IN_CONTEXT_FN — runs the RiskOutline and maps results to IScreenDto.

CrawlerService

Triggers signalJobSubject.next() after every crawl to kick off this pipeline.

Build docs developers (and LLMs) love