Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/sidmanale643/northstar/llms.txt

Use this file to discover all available pages before exploring further.

NorthStar provides four lightweight logging functions — log_event(), log_metric(), log_metadata(), and current_trace_id() — that let you attach structured data to the active trace or span from anywhere in your call stack. Unlike span() or observe(), they do not create new spans or alter the trace hierarchy; they simply annotate whatever is currently in scope. A fifth helper, flush(), lets you drain the queue synchronously when you need a hard guarantee that data has been sent before the process exits. All five functions are top-level exports of the northstar package and are safe to call from any thread.

log_event()

def log_event(name: str, data: Any = None) -> None
Attaches a named custom event to the innermost open span, or to the run itself if no span is currently active. Events are stored as EventType.CUSTOM records and are visible in the dashboard’s event timeline.
name
str
required
A short, dot-namespaced or snake_case label identifying the event, e.g. "cache_miss", "fallback_triggered", or "retrieval_started".
data
Any
default:"None"
Any JSON-serialisable value associated with the event. Dicts, lists, primitives, Pydantic models, and dataclasses are all accepted. Sensitive keys matching the redact list are replaced with "[REDACTED]" before the event is enqueued.
Example — attaching to a span:
@northstar.observe("retrieve-docs")
def retrieve_docs(query: str) -> list[str]:
    northstar.log_event("retrieval_started", {"query": query})
    results = vector_db.search(query)
    northstar.log_event("retrieval_complete", {"count": len(results)})
    return results
Example — attaching to the run:
@northstar.trace("support-agent")
def run_agent(query: str) -> str:
    # no span is open here — event attaches to the run
    northstar.log_event("agent_started", {"version": "2.1"})
    return handle(query)
log_event() is a no-op when called outside any active trace. It never raises an exception, so it is safe to call unconditionally.

log_metric()

def log_metric(name: str, value: Real) -> None
Records a named numeric metric on the current span or run. Metrics are stored as EventType.CUSTOM records with attributes["northstar_type"] = "metric" and are displayed separately from events in the dashboard.
name
str
required
Metric name, e.g. "retrieval_count", "confidence", "latency_ms".
value
Real
required
A numeric value (int or float). bool is explicitly rejected — passing True or False raises TypeError: metric value must be numeric. This guard prevents accidental boolean-as-number mistakes.
Example:
@northstar.observe("generate-answer")
def generate_answer(query: str, docs: list[str]) -> str:
    northstar.log_metric("retrieval_count", len(docs))
    northstar.log_metric("context_length", sum(len(d) for d in docs))
    return llm.complete(query, context=docs)
Passing a bool value raises TypeError immediately, even in no-op mode. This validation always runs so bugs are caught in development and in tests.
northstar.log_metric("ready", True)   # ❌ raises TypeError
northstar.log_metric("score", 0.86)   # ✅
northstar.log_metric("count", 5)      # ✅

log_metadata()

def log_metadata(metadata: Mapping[str, Any]) -> None
Merges metadata into the current span’s attributes dictionary, or into the run’s metadata dictionary if no span is active. The update is shallow (dict.update()), so existing keys that are not in metadata are preserved. The updated record is re-enqueued immediately so dashboard views reflect the change without waiting for the next flush.
metadata
Mapping[str, Any]
required
Key-value pairs to merge. Values must be JSON-serialisable. Sensitive keys are redacted automatically.
Example — attaching to the run:
@northstar.trace("research-agent")
def run_agent(user_id: str, query: str) -> str:
    northstar.log_metadata({"user_id": user_id, "plan": "pro"})
    return handle(query)
Example — attaching to the current span:
@northstar.observe("vector-search")
def vector_search(query: str) -> list[str]:
    results = index.query(query)
    northstar.log_metadata({"result_count": len(results), "index": "docs-v2"})
    return results

current_trace_id()

def current_trace_id() -> str | None
Returns the UUID of the currently active run as a str, or None if called outside any trace. Use this to correlate your application’s own log lines with NorthStar trace records. Example — structured logging correlation:
import logging
import northstar

logger = logging.getLogger(__name__)

@northstar.trace("support-agent")
def run_agent(query: str) -> str:
    trace_id = northstar.current_trace_id()
    logger.info("Agent started", extra={"trace_id": trace_id, "query": query})

    result = handle(query)

    logger.info("Agent finished", extra={"trace_id": trace_id})
    return result
Example — propagating to downstream services:
with northstar.trace("orchestrator") as trace:
    headers = {"X-Trace-Id": northstar.current_trace_id()}
    response = requests.post("https://tool-service/run", json=payload, headers=headers)
current_trace_id() returns None before northstar.init() is called, outside a trace() block, and in no-op mode. Check the return value before passing it to external systems.

flush()

def flush(timeout: float | None = None) -> bool
Drains the internal queue synchronously — all pending sessions, runs, spans, and events are sent to the ingest endpoint in a single HTTP request before the function returns. Returns True if the flush succeeded, False if an error occurred (network failure, timeout, etc.).
timeout
float | None
default:"None"
Maximum seconds to wait for the HTTP request to complete. When None, the client’s default timeout applies. Must be greater than zero if provided; raises ValueError otherwise.
Example — flush before process exit:
northstar.flush()
Example — flush with a deadline:
success = northstar.flush(timeout=3.0)
if not success:
    print("NorthStar flush timed out or failed")
Example — flush in a test:
def test_agent_records_span(mock_ingest_endpoint):
    run_agent("hello")
    assert northstar.flush() is True
    payload = json.loads(mock_ingest_endpoint.calls[0].request.content)
    assert payload["runs"][0]["status"] == "ok"
The background worker flushes automatically on a timer (flush_interval) and when the batch size is reached (batch_size). You only need to call flush() explicitly when you must guarantee delivery before a deadline — for example, at the end of a one-shot script, inside tests, or in a serverless function handler before it returns.
Passing timeout <= 0 raises ValueError immediately. Pass None to use the default timeout.

Build docs developers (and LLMs) love