Skip to main content

Installation

pip install -e clients/python
Requires Python 3.10+, grpcio >= 1.78.0, protobuf >= 6.31.1, and a running Membrane daemon (default: localhost:9090).

Quick start

from membrane import MembraneClient, Sensitivity, TrustContext

# Connect to the running Membrane daemon
client = MembraneClient("localhost:9090")

# Ingest an event
record = client.ingest_event(
    event_kind="file_edit",
    ref="src/main.py",
    summary="Refactored authentication module",
    sensitivity=Sensitivity.LOW,
)
print(f"Created record: {record.id}")

# Retrieve memories relevant to a task
trust = TrustContext(
    max_sensitivity=Sensitivity.MEDIUM,
    authenticated=True,
    actor_id="agent-1",
)
records = client.retrieve("fix the login bug", trust=trust, limit=5)
for r in records:
    print(f"  [{r.type.value}] {r.id} (salience={r.salience:.2f})")

# Reinforce a useful memory
client.reinforce(record.id, actor="agent-1", rationale="Used successfully")

# Clean up
client.close()

Context manager

with MembraneClient("localhost:9090") as client:
    record = client.ingest_observation(
        subject="user",
        predicate="prefers",
        obj={"language": "Python"},
        sensitivity=Sensitivity.LOW,
    )

MembraneClient

Constructor

MembraneClient(
    addr: str = "localhost:9090",
    *,
    tls: bool = False,
    tls_ca_cert: str | None = None,
    api_key: str | None = None,
    timeout: float | None = None,
)
addr
str
gRPC server address in host:port format. Defaults to "localhost:9090".
tls
bool
Enable TLS transport. When True and tls_ca_cert is not provided, system root certificates are used.
tls_ca_cert
str | None
Path to a PEM-encoded CA certificate for server verification. Implies tls=True.
api_key
str | None
Bearer token sent as authorization metadata on every RPC.
timeout
float | None
Default timeout in seconds for all RPC calls. None means no timeout.

TLS and authentication example

client = MembraneClient(
    "membrane.example.com:443",
    tls=True,                        # use TLS transport
    tls_ca_cert="/path/to/ca.pem",   # optional custom CA
    api_key="your-api-key",          # Bearer token auth
    timeout=10.0,                    # default timeout in seconds
)

Ingestion methods

ingest_event

Create an episodic record from an event.
def ingest_event(
    self,
    event_kind: str,
    ref: str,
    *,
    summary: str = "",
    sensitivity: Sensitivity | str = Sensitivity.LOW,
    source: str = "python-client",
    tags: Sequence[str] | None = None,
    scope: str = "",
    timestamp: str | None = None,
) -> MemoryRecord
event_kind
str
required
Kind of event, e.g. "file_edit", "tool_call", "error".
ref
str
required
Reference identifier for the event source.
summary
str
Human-readable summary of the event.
sensitivity
Sensitivity | str
Sensitivity classification. Defaults to Sensitivity.LOW.
source
str
Provenance source identifier. Defaults to "python-client".
tags
Sequence[str] | None
Tags for categorization and retrieval filtering.
scope
str
Visibility scope for the record.
timestamp
str | None
RFC 3339 timestamp. Defaults to now.
record = client.ingest_event(
    source="my-agent",
    event_kind="tool_call",
    ref="task#1",
    summary="Ran database migration successfully",
    tags=["db", "migration"],
)

ingest_tool_output

Create an episodic record from a tool invocation.
def ingest_tool_output(
    self,
    tool_name: str,
    *,
    args: dict[str, Any] | None = None,
    result: Any = None,
    sensitivity: Sensitivity | str = Sensitivity.LOW,
    source: str = "python-client",
    depends_on: Sequence[str] | None = None,
    tags: Sequence[str] | None = None,
    scope: str = "",
    timestamp: str | None = None,
) -> MemoryRecord
tool_name
str
required
Name of the tool that produced output.
args
dict[str, Any] | None
Arguments passed to the tool.
result
Any
Result returned by the tool (any JSON-serializable value).
depends_on
Sequence[str] | None
IDs of records this output depends on.
record = client.ingest_tool_output(
    "run_tests",
    args={"suite": "auth"},
    result={"passed": 42, "failed": 0},
    tags=["tests", "auth"],
)

ingest_observation

Create a semantic record from a subject-predicate-object triple.
def ingest_observation(
    self,
    subject: str,
    predicate: str,
    obj: Any,
    *,
    sensitivity: Sensitivity | str = Sensitivity.LOW,
    source: str = "python-client",
    tags: Sequence[str] | None = None,
    scope: str = "",
    timestamp: str | None = None,
) -> MemoryRecord
subject
str
required
The subject of the observation.
predicate
str
required
The predicate relating subject to object.
obj
Any
required
The object value (any JSON-serializable value).
record = client.ingest_observation(
    subject="user",
    predicate="prefers",
    obj={"language": "Python"},
    sensitivity=Sensitivity.LOW,
)

ingest_outcome

Attach an outcome to an existing episodic record.
def ingest_outcome(
    self,
    target_record_id: str,
    outcome_status: OutcomeStatus | str,
    *,
    source: str = "python-client",
    timestamp: str | None = None,
) -> MemoryRecord
target_record_id
str
required
ID of the record to attach the outcome to.
outcome_status
OutcomeStatus | str
required
One of "success", "failure", or "partial".
from membrane import OutcomeStatus

client.ingest_outcome(record.id, OutcomeStatus.SUCCESS)

ingest_working_state

Create a working memory snapshot for a task thread.
def ingest_working_state(
    self,
    thread_id: str,
    state: str,
    *,
    next_actions: Sequence[str] | None = None,
    open_questions: Sequence[str] | None = None,
    context_summary: str = "",
    active_constraints: Sequence[dict[str, Any]] | None = None,
    sensitivity: Sensitivity | str = Sensitivity.LOW,
    source: str = "python-client",
    tags: Sequence[str] | None = None,
    scope: str = "",
    timestamp: str | None = None,
) -> MemoryRecord
thread_id
str
required
Identifier for the task thread.
state
str
required
Current task state: "planning", "executing", "blocked", "waiting", or "done".
next_actions
Sequence[str] | None
Planned next steps.
open_questions
Sequence[str] | None
Unresolved questions.
context_summary
str
Human-readable summary of current context.
active_constraints
Sequence[dict[str, Any]] | None
Active constraints as JSON-serializable dicts.
client.ingest_working_state(
    thread_id="session-001",
    state="executing",
    next_actions=["run tests", "deploy"],
    context_summary="Backend initialized, frontend pending, docs TODO",
)

Retrieval methods

retrieve

Retrieve memory records relevant to a task descriptor. Returns only records (backward-compatible helper).
def retrieve(
    self,
    task_descriptor: str,
    *,
    trust: TrustContext | None = None,
    memory_types: Sequence[MemoryType | str] | None = None,
    min_salience: float = 0.0,
    limit: int = 10,
) -> list[MemoryRecord]
task_descriptor
str
required
Natural-language description of the current task.
trust
TrustContext | None
Trust context controlling access. Defaults to a minimal context with Sensitivity.LOW.
memory_types
Sequence[MemoryType | str] | None
Filter by memory types: "episodic", "working", "semantic", "competence", "plan_graph".
min_salience
float
Minimum salience threshold. Defaults to 0.0.
limit
int
Maximum records to return. Defaults to 10.
trust = TrustContext(
    max_sensitivity=Sensitivity.MEDIUM,
    authenticated=True,
    actor_id="agent-1",
)
records = client.retrieve(
    "database operations",
    trust=trust,
    memory_types=["semantic", "competence"],
    limit=5,
)

retrieve_with_selection

Retrieve records plus optional selector metadata (ranked candidates, confidence).
def retrieve_with_selection(
    self,
    task_descriptor: str,
    *,
    trust: TrustContext | None = None,
    memory_types: Sequence[MemoryType | str] | None = None,
    min_salience: float = 0.0,
    limit: int = 10,
) -> RetrieveResult
Returns a RetrieveResult dataclass:
@dataclass
class RetrieveResult:
    records: list[MemoryRecord]
    selection: SelectionResult | None  # optional selector metadata

@dataclass
class SelectionResult:
    selected: list[MemoryRecord]
    confidence: float
    needs_more: bool

retrieve_by_id

Fetch a single record by its ID.
def retrieve_by_id(
    self,
    record_id: str,
    *,
    trust: TrustContext | None = None,
) -> MemoryRecord
record = client.retrieve_by_id("rec-uuid-here")

Revision methods

supersede

Replace a record with a new version.
def supersede(
    self,
    old_id: str,
    new_record: dict[str, Any] | MemoryRecord,
    actor: str,
    rationale: str,
) -> MemoryRecord
updated = client.supersede(
    old_record.id,
    {**old_record.to_dict(), "payload": {"version": "2.0"}},
    actor="agent-1",
    rationale="Updated version field",
)

fork

Create a conditional variant of a record.
def fork(
    self,
    source_id: str,
    forked_record: dict[str, Any] | MemoryRecord,
    actor: str,
    rationale: str,
) -> MemoryRecord
variant = client.fork(
    source_record.id,
    {**source_record.to_dict(), "scope": "dev"},
    actor="agent-1",
    rationale="Different behavior for dev environment",
)

retract

Soft-delete a record.
def retract(
    self,
    record_id: str,
    actor: str,
    rationale: str,
) -> None
client.retract(record.id, actor="agent-1", rationale="No longer accurate")

merge

Combine multiple records into a single consolidated record.
def merge(
    self,
    record_ids: Sequence[str],
    merged_record: dict[str, Any] | MemoryRecord,
    actor: str,
    rationale: str,
) -> MemoryRecord
merged = client.merge(
    [id1, id2, id3],
    {"type": "semantic", "payload": {"fact": "combined knowledge"}},
    actor="agent-1",
    rationale="Consolidating duplicate semantic records",
)

contest

Mark a record as contested due to conflicting evidence.
def contest(
    self,
    record_id: str,
    contesting_ref: str,
    actor: str,
    rationale: str,
) -> None
client.contest(
    record.id,
    conflicting_record.id,
    actor="agent-1",
    rationale="New evidence contradicts this",
)

Reinforcement methods

reinforce

Boost a record’s salience score.
def reinforce(
    self,
    record_id: str,
    actor: str,
    rationale: str,
) -> None
client.reinforce(record.id, actor="agent-1", rationale="Used successfully in task")

penalize

Reduce a record’s salience score.
def penalize(
    self,
    record_id: str,
    amount: float,
    actor: str,
    rationale: str,
) -> None
client.penalize(record.id, amount=0.2, actor="agent-1", rationale="Led to incorrect result")

get_metrics

Retrieve a point-in-time metrics snapshot from the daemon.
def get_metrics(self) -> dict[str, Any]
metrics = client.get_metrics()
print(metrics["total_records"], metrics["avg_salience"])

close

Close the underlying gRPC channel. The context manager calls this automatically.
def close(self) -> None

Core types

Sensitivity

class Sensitivity(str, Enum):
    PUBLIC = "public"
    LOW    = "low"
    MEDIUM = "medium"
    HIGH   = "high"
    HYPER  = "hyper"
Controls access during retrieval. Records above the caller’s max_sensitivity are returned in redacted form.

TrustContext

@dataclass
class TrustContext:
    max_sensitivity: Sensitivity = Sensitivity.LOW  # highest sensitivity the caller may access
    authenticated: bool = False
    actor_id: str = ""
    scopes: list[str] = field(default_factory=list)

MemoryRecord

@dataclass
class MemoryRecord:
    id: str
    type: MemoryType           # "episodic" | "working" | "semantic" | "competence" | "plan_graph"
    sensitivity: Sensitivity
    confidence: float          # 0–1 applicability score
    salience: float            # 0–1 current importance score
    scope: str
    tags: list[str]
    created_at: str
    updated_at: str
    lifecycle: Lifecycle | None
    provenance: Provenance | None
    relations: list[Relation]
    payload: Any               # type-specific structured payload
    audit_log: list[AuditEntry]

MemoryType

class MemoryType(str, Enum):
    EPISODIC   = "episodic"
    WORKING    = "working"
    SEMANTIC   = "semantic"
    COMPETENCE = "competence"
    PLAN_GRAPH = "plan_graph"

OutcomeStatus

class OutcomeStatus(str, Enum):
    SUCCESS = "success"
    FAILURE = "failure"
    PARTIAL = "partial"

Relation

@dataclass
class Relation:
    target_id: str = ""   # ID of the related MemoryRecord
    kind: str = ""        # relationship type (maps to Go Predicate)
    weight: float = 1.0   # strength of the relationship [0, 1]
The Python Relation.kind field maps to Go’s schema.Relation.Predicate. Common values: supports, contradicts, derived_from, supersedes, contested_by.

Additional exported types

All of the following are importable from the top-level membrane package:
from membrane import (
    AuditAction,     # Enum: create, revise, fork, merge, delete, reinforce, decay
    AuditEntry,      # Dataclass: action, actor, timestamp, rationale
    DecayCurve,      # Enum: EXPONENTIAL = "exponential"
    DecayProfile,    # Dataclass: curve, half_life_seconds
    DeletionPolicy,  # Enum: AUTO_PRUNE, MANUAL_ONLY, NEVER
    EdgeKind,        # Enum: DATA = "data", CONTROL = "control"
    Lifecycle,       # Dataclass: decay, last_reinforced_at, deletion_policy
    ProvenanceKind,  # Enum: EVENT, ARTIFACT, TOOL_CALL, OBSERVATION, OUTCOME
    ProvenanceSource,# Dataclass: kind, ref, timestamp
    Provenance,      # Dataclass: sources (list[ProvenanceSource])
    Relation,        # Dataclass: target_id, kind, weight
    RetrieveResult,  # Dataclass: records, selection
    RevisionStatus,  # Enum: ACTIVE, CONTESTED, RETRACTED
    SelectionResult, # Dataclass: selected, confidence, needs_more
    TaskState,       # Enum: PLANNING, EXECUTING, BLOCKED, WAITING, DONE
    ValidityMode,    # Enum: GLOBAL, CONDITIONAL, TIMEBOXED
)

Error handling

Failed RPCs raise grpc.RpcError. Inspect e.code() and e.details() for the gRPC status:
import grpc
from membrane import MembraneClient

client = MembraneClient("localhost:9090")
try:
    record = client.retrieve_by_id("nonexistent-id")
except grpc.RpcError as e:
    print(e.code())     # e.g. StatusCode.NOT_FOUND
    print(e.details())  # human-readable message
finally:
    client.close()

Build docs developers (and LLMs) love