Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/microsoft/agent-governance-toolkit/llms.txt

Use this file to discover all available pages before exploring further.

Every action an AI agent takes — tool calls, policy decisions, trust handshakes, approval workflows — must be recorded in a tamper-proof log. Without it you cannot answer the question every auditor will ask: “What exactly did this agent do, and who authorised it?” The Agent Governance Toolkit provides two complementary audit components:
ClassModuleBest for
AuditLogagentmesh.governance.auditFull Merkle-chained log with query, CloudEvents export, and external sinks
GovernanceAuditLoggeragent_os.audit_loggerLightweight pluggable-backend logger used internally by govern()
Both are covered below. Most application code uses AuditLog. GovernanceAuditLogger is the lower-level primitive used inside the governance gate.

AuditLog

Import

from agentmesh.governance.audit import AuditLog, AuditEntry, AuditChain

Constructor

AuditLog(*, sink: AuditSink | None = None)
sink
AuditSink | None
default:"None"
Optional external sink for durable storage. When None, entries are stored in memory only (suitable for development and testing). In production pass a FileAuditSink or a custom AuditSink implementation.
from agentmesh.governance.audit import AuditLog
from agentmesh.governance.audit_backends import FileAuditSink

sink = FileAuditSink(
    path="audit_trail.jsonl",
    secret_key=b"my-hmac-secret",
)
audit = AuditLog(sink=sink)

log() — Record an event

def log(
    event_type: str,
    agent_did: str,
    action: str,
    resource: str | None = None,
    data: dict | None = None,
    outcome: str = "success",
    policy_decision: str | None = None,
    trace_id: str | None = None,
    *,
    arguments_hash: str | None = None,
    approver_did: str | None = None,
    policy_version: str | None = None,
    issued_at: datetime | None = None,
    completed_at: datetime | None = None,
) -> AuditEntry
Appends a governance event to the Merkle chain, optionally writes it to the configured sink, and returns the completed AuditEntry.
event_type
str
required
Event classification. Standard values:
ValueWhen
tool_invocationAgent successfully called a tool
tool_blockedPolicy denied a tool call
policy_evaluationPolicy engine evaluated a request
policy_violationAgent violated a governance policy
rogue_detectionAnomaly detection flagged the agent
agent_invocationAgent-to-agent delegation occurred
trust_handshakeIATP trust handshake completed
agent_registeredNew agent identity registered
audit_integrityChain integrity check completed
approval_decisionHuman approval was requested and resolved
agent_did
str
required
DID of the acting agent (e.g. "did:mesh:a3f8c2..." or "did:web:agent.example.com").
action
str
required
The action the agent took or attempted (e.g. "web_search", "delete_file").
resource
str | None
default:"None"
Resource path or identifier that was accessed (e.g. "/api/users", "crm/contacts").
data
dict | None
default:"None"
Arbitrary metadata about the event. Secrets are expected to be stripped by the caller before passing here.
outcome
str
default:"success"
Result of the action: "success", "failure", "denied", or "error".
policy_decision
str | None
default:"None"
Human-readable policy result (e.g. "allowed", "denied by block-destructive").
trace_id
str | None
default:"None"
Distributed tracing correlation ID. Used to correlate entries across a multi-agent workflow.
arguments_hash
str | None
default:"None"
(keyword-only) SHA-256 hex digest of the canonical-JSON serialization of the action’s arguments. Defends against silent mutation of recorded arguments.
approver_did
str | None
default:"None"
(keyword-only) DID of the principal whose approval authorised this action.
policy_version
str | None
default:"None"
(keyword-only) Version identifier of the policy bundle active at decision time.
issued_at
datetime | None
default:"None"
(keyword-only) UTC datetime when the action was authorised or issued for execution. Paired with completed_at to compute verifiable execution latency.
completed_at
datetime | None
default:"None"
(keyword-only) UTC datetime when the action’s outcome was recorded.
Returns an AuditEntry with entry_id, entry_hash, and previous_hash populated by the Merkle chain.
entry = audit.log(
    event_type="tool_invocation",
    agent_did="did:web:sales-assistant.example.com",
    action="allow",
    resource="/crm/contacts",
    data={"tool": "crm_lookup", "query": "acme corp"},
    outcome="success",
    trace_id="trace-7f3a",
)

print(entry.entry_id)    # "audit_a1b2c3d4e5f6..."
print(entry.entry_hash)  # SHA-256 hex digest
print(entry.timestamp)   # UTC datetime

query() — Search the audit trail

def query(
    agent_did: str | None = None,
    event_type: str | None = None,
    start_time: datetime | None = None,
    end_time: datetime | None = None,
    outcome: str | None = None,
    limit: int = 100,
) -> list[AuditEntry]
Filters and returns matching entries. All parameters are optional and additive. Returns at most limit entries from the tail of the matching set.
from datetime import datetime, timezone, timedelta

yesterday = datetime.now(timezone.utc) - timedelta(days=1)

denied = audit.query(
    event_type="tool_blocked",
    outcome="denied",
    start_time=yesterday,
    limit=200,
)

for e in denied:
    print(f"{e.timestamp} | {e.agent_did} | {e.resource}")

get_entry() — Look up by ID

def get_entry(self, entry_id: str) -> AuditEntry | None
Returns the AuditEntry with the given entry_id, or None if not found.

get_entries_for_agent() — All entries by agent

def get_entries_for_agent(
    self,
    agent_did: str,
    limit: int = 100,
) -> list[AuditEntry]
Returns the most recent limit entries for the specified agent DID.

get_entries_by_type() — All entries by type

def get_entries_by_type(
    self,
    event_type: str,
    limit: int = 100,
) -> list[AuditEntry]
Returns the most recent limit entries of the given event type.

verify_integrity() — Merkle chain verification

def verify_integrity(self) -> tuple[bool, str | None]
Verifies the entire Merkle chain: every entry’s stored hash is recomputed and checked, and each entry’s previous_hash is verified against the prior entry. Returns (True, None) on success, (False, error_message) on failure.
is_valid, error = audit.verify_integrity()
if not is_valid:
    raise RuntimeError(f"Audit trail tampered: {error}")
print("✅ Audit chain intact")

get_proof() — Merkle inclusion proof

def get_proof(self, entry_id: str) -> dict | None
Generates a Merkle inclusion proof for the given entry. Returns None if the entry is not found.
entry
dict
The AuditEntry serialized as a dict.
merkle_proof
list[tuple[str, str]]
List of (sibling_hash, position) tuples. position is "left" or "right". O(log n) length.
merkle_root
str
Current Merkle root hash.
verified
bool
True if verify_proof(entry.entry_hash, proof, merkle_root) passes.
proof = audit.get_proof(entry.entry_id)
assert proof["verified"], "Proof failed"

# Publish the root hash and give auditors the proof to verify independently
print(f"Root: {proof['merkle_root']}")
print(f"Proof steps: {len(proof['merkle_proof'])}")

export() — Full log export

def export(
    start_time: datetime | None = None,
    end_time: datetime | None = None,
) -> dict
Exports matching entries as a plain dict with chain metadata.
exported_at
str
ISO 8601 UTC timestamp of the export.
merkle_root
str
Current Merkle root hash.
entry_count
int
Number of entries in the export.
entries
list[dict]
All matching AuditEntry dicts.

export_cloudevents() — CloudEvents v1.0 export

def export_cloudevents(
    start_time: datetime | None = None,
    end_time: datetime | None = None,
) -> list[dict]
Exports entries as CloudEvents v1.0 JSON envelopes. Each entry is converted by AuditEntry.to_cloudevent().
events = audit.export_cloudevents(start_time=yesterday)
for ce in events:
    print(ce["type"])    # e.g. "ai.agentmesh.tool.invoked"
    print(ce["source"])  # agent DID
    print(ce["id"])      # entry_id
CloudEvents type mapping:
event_typeCloudEvents type
tool_invocationai.agentmesh.tool.invoked
tool_blockedai.agentmesh.tool.blocked
policy_evaluationai.agentmesh.policy.evaluation
policy_violationai.agentmesh.policy.violation
trust_handshakeai.agentmesh.trust.handshake
trust_score_updatedai.agentmesh.trust.score.updated
agent_registeredai.agentmesh.agent.registered
agent_verifiedai.agentmesh.agent.verified
audit_integrityai.agentmesh.audit.integrity.verified
(other)ai.agentmesh.{event_type}
CloudEvents envelope structure:
{
  "specversion": "1.0",
  "id": "audit_a1b2c3d4",
  "type": "ai.agentmesh.tool.invoked",
  "source": "did:web:agent.example.com",
  "time": "2025-01-15T10:30:00.000000Z",
  "datacontenttype": "application/json",
  "data": {
    "action": "allow",
    "resource": "/api/users",
    "outcome": "success",
    "policy_decision": "allowed",
    "matched_rule": null
  },
  "agentmeshentryhash": "sha256-of-entry",
  "agentmeshprevioushash": "sha256-of-prior-entry",
  "traceid": "trace-7f3a"
}

AuditEntry Fields

Every call to audit.log() returns an AuditEntry (Pydantic BaseModel).
entry_id
str
Unique identifier in the format audit_{16-hex-chars}.
timestamp
datetime
UTC datetime when the entry was created.
event_type
str
Event classification string.
agent_did
str
DID of the acting agent.
action
str
The action the agent took or attempted.
resource
str | None
Resource path or identifier, when applicable.
data
dict
Arbitrary event metadata.
outcome
str
"success", "failure", "denied", or "error".
policy_decision
str | None
Human-readable policy result.
matched_rule
str | None
Policy rule name that fired, when applicable.
previous_hash
str
SHA-256 hash of the prior entry in the chain. "" for the first entry.
entry_hash
str
SHA-256 hash of this entry’s canonical fields. Populated by AuditChain.add_entry().
trace_id
str | None
Distributed tracing correlation ID.
session_id
str | None
Session identifier.
sandbox_id
str | None
Auto-populated from SANDBOX_ID or OPENSHELL_SANDBOX_ID environment variables.
environment
str | None
Auto-populated from AGT_ENVIRONMENT environment variable.
compute_driver
str | None
Auto-populated from OPENSHELL_COMPUTE_DRIVER environment variable.
arguments_hash
str | None
SHA-256 hex digest of the action arguments (keyword-only at log time).
approver_did
str | None
DID of the approver who authorised this action.
policy_version
str | None
Version of the policy bundle active at decision time.
issued_at
datetime | None
Optional UTC datetime when the action was authorised or issued for execution. Paired with completed_at to compute verifiable execution latency.
completed_at
datetime | None
Optional UTC datetime when the action’s outcome was recorded.

AuditEntry methods

# Recompute the SHA-256 hash from canonical fields
computed = entry.compute_hash()

# Verify stored hash matches computed hash
assert entry.verify_hash()

# Export as CloudEvents v1.0 envelope
ce = entry.to_cloudevent()

Merkle Chain Integrity

AuditLog internally delegates to AuditChain (a Merkle audit chain). Every entry is hashed, and that hash is included in the next entry’s hash computation — creating a tamper-evident chain:
        Root Hash
       /         \
    H(AB)       H(CD)
   /    \      /    \
  H(A)  H(B) H(C)  H(D)   ← leaf = SHA-256 of AuditEntry
Key properties:
  • Append-only — entries cannot be removed or reordered without detection.
  • Tamper-evident — changing any entry invalidates the root hash.
  • Efficient proofs — proving an entry exists requires O(log n) hashes.

External proof verification

from agentmesh.governance.audit import AuditChain

# An auditor with only the root hash can verify inclusion:
chain = AuditChain()
verified = chain.verify_proof(
    entry_hash="abc123...",
    proof=[("def456...", "left"), ("789aaa...", "right")],
    root_hash="expected-root...",
)
print(f"Entry in log: {verified}")

External Sinks

In-memory audit is fine for development. In production, use FileAuditSink or implement the AuditSink protocol.

FileAuditSink

from agentmesh.governance.audit_backends import FileAuditSink

sink = FileAuditSink(
    path="audit_trail.jsonl",
    secret_key=b"change-me-to-a-real-secret",   # HMAC signing key
    max_file_size=50 * 1024 * 1024,              # rotate at 50 MB (0 = no rotation)
)
audit = AuditLog(sink=sink)

# Each entry is HMAC-signed and hash-chained.
# Verify the on-disk chain independently:
is_valid, error = sink.verify_integrity()

sink.close()
Every line in the output JSONL file contains content_hash, previous_hash, and an HMAC signature.

Custom sink — PostgreSQL example

from agentmesh.governance.audit_backends import AuditSink
from agentmesh.governance.audit import AuditEntry

class PostgresSink:
    """Push audit entries to a PostgreSQL table."""

    def __init__(self, dsn: str):
        import psycopg2
        self._conn = psycopg2.connect(dsn)

    def write(self, entry: AuditEntry) -> None:
        with self._conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO audit_log
                    (entry_id, timestamp, event_type, agent_did,
                     action, resource, outcome, entry_hash, trace_id)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                """,
                (
                    entry.entry_id,
                    entry.timestamp.isoformat(),
                    entry.event_type,
                    entry.agent_did,
                    entry.action,
                    entry.resource,
                    entry.outcome,
                    entry.entry_hash,
                    entry.trace_id,
                ),
            )
            self._conn.commit()

    def write_batch(self, entries: list[AuditEntry]) -> None:
        for entry in entries:
            self.write(entry)

    def verify_integrity(self) -> tuple[bool, str | None]:
        return True, None

    def close(self) -> None:
        self._conn.close()

audit = AuditLog(sink=PostgresSink(dsn=os.environ["DATABASE_URL"]))

GovernanceAuditLogger

GovernanceAuditLogger is a lightweight multi-backend logger used internally by govern() and available for direct use when you need a simple structured logger without the full Merkle chain.

Import

from agent_os.audit_logger import GovernanceAuditLogger, AuditEntry
from agent_os.audit_logger import JsonlFileBackend, InMemoryBackend, LoggingBackend

Constructor

GovernanceAuditLogger()
Creates an empty logger with no backends. Add backends before logging.

add_backend() — Register a backend

audit = GovernanceAuditLogger()
audit.add_backend(InMemoryBackend())
audit.add_backend(JsonlFileBackend("governance.jsonl"))

log_decision() — Record a governance decision

def log_decision(
    self,
    agent_id: str,
    action: str,
    decision: str,
    reason: str = "",
    latency_ms: float = 0.0,
    **metadata: Any,
) -> None
agent_id
str
required
Identifier of the acting agent.
action
str
required
The action that was evaluated.
decision
str
required
The governance decision: "allow", "deny", "audit", etc.
reason
str
default:""
Human-readable explanation.
latency_ms
float
default:"0.0"
Evaluation latency in milliseconds.
**metadata
Any
Additional key-value pairs stored in AuditEntry.metadata.

log() — Write a raw AuditEntry

audit.log(entry: AuditEntry) -> None
Writes a pre-built AuditEntry directly to all registered backends.

flush() — Flush all backends

audit.flush() -> None

Built-in backends

ClassBehaviour
JsonlFileBackend(path)Writes JSONL to file. Thread-safe (internal lock). File created with 0o600 permissions on POSIX.
InMemoryBackend()Stores entries in backend.entries list. Use in tests.
LoggingBackend(logger_name="agent_os.audit")Emits via Python logging at INFO level.

OpenTelemetry Integration

Route governance audit entries as structured OTel LogRecords:
from agent_os.audit_logger import GovernanceAuditLogger
from agent_os.otel_audit_backend import OTelLogsBackend

audit = GovernanceAuditLogger()
audit.add_backend(OTelLogsBackend())

# All log_decision() calls are now emitted as OTel LogRecords
audit.log_decision(agent_id="agent-1", action="web_search", decision="allow")
OTelLogsBackend is a safe no-op when opentelemetry-sdk is not installed. Check backend.enabled to confirm it initialised correctly.
logger_name
str
default:"agent_os.governance.audit"
OTel logger instrument name.
logger_provider
Any
default:"None"
Explicit LoggerProvider. When None, uses the global provider.
service_name
str
default:"agent-governance-toolkit"
Service name written into the log resource.
OTel attributes emitted per entry:
AttributeValue
event.domain"agent_os.governance"
event.name"audit_entry"
agt.agent.idagent_id
agt.audit.event_typeevent_type
agt.audit.actionaction
agt.audit.decisiondecision
agt.audit.reasonreason (when non-empty)
agt.audit.latency_mslatency_ms
agt.audit.meta.*All metadata keys promoted as string attributes

GovernanceEventSink (Advanced)

For high-throughput SIEM / XDR routing, use the GovernanceEventProcessor with the GovernanceEventSink protocol. This mirrors the OpenTelemetry BatchSpanProcessor pattern with DROP_OLDEST backpressure and per-sink circuit breakers.
from agent_os.event_sink import (
    GovernanceEventProcessor,
    GovernanceEvent,
    GovernanceEventKind,
    GovernanceEventSinkBase,
    SinkExportResult,
)

class SIEMSink(GovernanceEventSinkBase):
    """Route events to a SIEM over HTTPS."""

    def emit(self, events) -> SinkExportResult:
        try:
            payload = [e.to_dict() for e in events]
            # post_to_siem(payload)
            return SinkExportResult.SUCCESS
        except Exception:
            return SinkExportResult.FAILURE

# Set up the processor
processor = GovernanceEventProcessor(
    max_queue_size=1024,        # or AGT_GSP_MAX_QUEUE_SIZE env var
    schedule_delay_ms=2000,     # or AGT_GSP_SCHEDULE_DELAY_MS
    max_batch_size=100,         # or AGT_GSP_MAX_BATCH_SIZE
    export_timeout_ms=10000,    # or AGT_GSP_EXPORT_TIMEOUT_MS
)
processor.add_sink(SIEMSink())

# Emit an event
event = GovernanceEvent(
    kind=GovernanceEventKind.POLICY_VIOLATION,
    severity="warning",
    agent_id="agent-001",
    action="delete_database",
    decision="deny",
    reason="Destructive operation blocked",
)
processor.on_event(event)

# Graceful shutdown (flushes remaining events)
processor.shutdown(timeout_ms=5000)
GovernanceEventKind values: POLICY_CHECK, POLICY_VIOLATION, TOOL_CALL_BLOCKED, PROMPT_INJECTION_DETECTED, IDENTITY_VERIFIED, IDENTITY_REJECTED, RESOURCE_ACCESS, ESCALATION_REQUESTED, CHECKPOINT_CREATED, ANOMALY_DETECTED, MCP_TOOL_POISONING, CONTENT_VIOLATION.

AuditBackendSinkAdapter

Bridges the legacy AuditBackend write/flush protocol to the batch-oriented GovernanceEventSink, allowing JsonlFileBackend, OTelLogsBackend, and custom backends to work with GovernanceEventProcessor without modification:
from agent_os.event_sink import AuditBackendSinkAdapter
from agent_os.audit_logger import JsonlFileBackend

adapter = AuditBackendSinkAdapter(JsonlFileBackend("governance.jsonl"))
processor.add_sink(adapter)

Decision BOM

Every audit_entry dict in a PolicyDecision is a Decision Bill of Materials (BOM) — a record of every factor that contributed to the governance decision. For on-demand reconstruction of full BOMs from observability signals:
from agentmesh.governance.decision_bom import DecisionBOMReconstructor

reconstructor = DecisionBOMReconstructor(
    audit_source=my_audit_backend,
    trust_source=my_trust_store,
    policy_source=my_policy_log,
)

bom = reconstructor.reconstruct(trace_id="trace-7f3a")
print(bom.outcome)            # "allow"
print(bom.completeness_score) # 0.8 (0.0–1.0)

for field in bom.fields:
    print(f"{field.category.value}: {field.name} = {field.value}")

Complete Example

from datetime import datetime, timezone, timedelta
from agentmesh.governance.audit import AuditLog
from agentmesh.governance.audit_backends import FileAuditSink

# Production setup: HMAC-signed JSONL on disk
sink = FileAuditSink(
    path="audit_trail.jsonl",
    secret_key=b"change-me-in-production",
)
audit = AuditLog(sink=sink)

# Log a policy evaluation
entry = audit.log(
    event_type="policy_evaluation",
    agent_did="did:web:research-agent.example.com",
    action="allow",
    resource="/api/papers",
    data={"tool": "web_search", "query": "OWASP AI security"},
    outcome="success",
    policy_decision="allowed",
    trace_id="trace-abc123",
    policy_version="1.0",
)

print(f"Entry ID: {entry.entry_id}")
print(f"Hash: {entry.entry_hash[:16]}...")

# Verify chain integrity
is_valid, error = audit.verify_integrity()
assert is_valid, f"Chain broken: {error}"

# Query for recent violations
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
violations = audit.query(event_type="policy_violation", start_time=yesterday)
print(f"Violations in last 24h: {len(violations)}")

# Export for auditors
data = audit.export(start_time=yesterday)
print(f"Merkle root: {data['merkle_root']}")
print(f"Entries: {data['entry_count']}")

# CloudEvents export for SIEM ingestion
cloud_events = audit.export_cloudevents(start_time=yesterday)

sink.close()

See Also

Build docs developers (and LLMs) love