Skip to main content
Import path: github.com/GustyCube/membrane/pkg/ingestion The ingestion package classifies, validates, and persists raw data as MemoryRecord instances. It coordinates the Classifier, PolicyEngine, and storage.Store.
You can call all ingestion methods through the top-level Membrane facade without importing this package directly. Import pkg/ingestion only when constructing a Service independently.

Service

type Service struct { ... }

func NewService(store storage.Store, classifier *Classifier, policy *PolicyEngine) *Service
Orchestrates ingestion. In normal usage Membrane.New wires this together automatically.

IngestEvent

func (s *Service) IngestEvent(ctx context.Context, req IngestEventRequest) (*schema.MemoryRecord, error)
Creates an episodic MemoryRecord from a discrete event such as a user input, system alert, or error.

IngestEventRequest

Source
string
required
Identifies the actor or system that produced the event. Used in provenance tracking.
EventKind
string
required
Type of event. Examples: user_input, error, system.
Ref
string
required
Reference identifier for the source event within the host system.
Summary
string
Optional human-readable summary of the event.
Timestamp
time.Time
When the event occurred. Defaults to time.Now().UTC() when zero.
Tags
[]string
Optional labels for categorisation and retrieval.
Scope
string
Visibility scope (e.g., user, project).
Sensitivity
schema.Sensitivity
Overrides the default sensitivity level when set.
rec, err := m.IngestEvent(ctx, ingestion.IngestEventRequest{
    Source:    "agent-core",
    EventKind: "user_input",
    Ref:       "msg-abc123",
    Summary:   "User asked about deployment strategy",
    Tags:      []string{"deployment", "strategy"},
})

IngestToolOutput

func (s *Service) IngestToolOutput(ctx context.Context, req IngestToolOutputRequest) (*schema.MemoryRecord, error)
Creates an episodic MemoryRecord with a populated ToolGraph from a tool invocation.

IngestToolOutputRequest

Source
string
required
Identifies the actor or system that invoked the tool.
ToolName
string
required
Name or identifier of the invoked tool.
Args
map[string]any
Arguments passed to the tool.
Result
any
Output produced by the tool.
DependsOn
[]string
IDs of tool nodes this output depends on, forming a tool graph.
Timestamp
time.Time
When the tool was invoked. Defaults to time.Now().UTC() when zero.
Tags
[]string
Optional labels for categorisation.
Scope
string
Visibility scope.
Sensitivity
schema.Sensitivity
Overrides the default sensitivity level when set.
rec, err := m.IngestToolOutput(ctx, ingestion.IngestToolOutputRequest{
    Source:   "agent-core",
    ToolName: "run_bash",
    Args:     map[string]any{"command": "ls -la"},
    Result:   "total 8\ndrwxr-xr-x ...",
})

IngestObservation

func (s *Service) IngestObservation(ctx context.Context, req IngestObservationRequest) (*schema.MemoryRecord, error)
Creates a semantic MemoryRecord from a structured subject-predicate-object observation. The resulting record has ValidityMode: global and RevisionPolicy: replace by default.

IngestObservationRequest

Source
string
required
Identifies the actor or system that made the observation.
Subject
string
required
Entity the observation is about.
Predicate
string
required
Relationship or property being observed.
Object
any
required
Value or related entity being observed.
Timestamp
time.Time
When the observation was made. Defaults to time.Now().UTC() when zero.
Tags
[]string
Optional labels.
Scope
string
Visibility scope.
Sensitivity
schema.Sensitivity
Overrides the default sensitivity level when set.
rec, err := m.IngestObservation(ctx, ingestion.IngestObservationRequest{
    Source:    "agent-core",
    Subject:   "user:alice",
    Predicate: "prefers_language",
    Object:    "Go",
    Tags:      []string{"preference"},
})

IngestOutcome

func (s *Service) IngestOutcome(ctx context.Context, req IngestOutcomeRequest) (*schema.MemoryRecord, error)
Updates an existing episodic record with outcome data. Fetches TargetRecordID, sets EpisodicPayload.Outcome, appends a provenance source, and persists the update. Returns an error if the target record is not episodic.

IngestOutcomeRequest

Source
string
required
Identifies the actor or system reporting the outcome.
TargetRecordID
string
required
ID of the existing episodic record to update.
OutcomeStatus
schema.OutcomeStatus
required
Result of the episode. One of success, failure, or partial.
Timestamp
time.Time
When the outcome was determined. Defaults to time.Now().UTC() when zero.
updated, err := m.IngestOutcome(ctx, ingestion.IngestOutcomeRequest{
    Source:         "agent-core",
    TargetRecordID: rec.ID,
    OutcomeStatus:  schema.OutcomeStatusSuccess,
})

IngestWorkingState

func (s *Service) IngestWorkingState(ctx context.Context, req IngestWorkingStateRequest) (*schema.MemoryRecord, error)
Creates a working MemoryRecord from a snapshot of the current task state. Working records are intended to be replaced (or discarded) when a task ends.

IngestWorkingStateRequest

Source
string
required
Identifies the actor or system producing the working state.
ThreadID
string
required
Identifier for the current thread or session.
State
schema.TaskState
required
Current task state. One of planning, executing, blocked, waiting, done.
NextActions
[]string
List of next planned actions.
OpenQuestions
[]string
Unresolved questions for the task.
ContextSummary
string
Human-readable summary of the current context.
ActiveConstraints
[]schema.Constraint
Constraints currently active for the task execution.
Timestamp
time.Time
When the working state was captured. Defaults to time.Now().UTC() when zero.
Tags
[]string
Optional labels.
Scope
string
Visibility scope.
Sensitivity
schema.Sensitivity
Overrides the default sensitivity level when set.
rec, err := m.IngestWorkingState(ctx, ingestion.IngestWorkingStateRequest{
    Source:         "agent-core",
    ThreadID:       "session-xyz",
    State:          schema.TaskStateExecuting,
    NextActions:    []string{"run tests", "review output"},
    ContextSummary: "Refactoring the auth module",
})

Default confidence by source type

The PolicyEngine assigns an initial Confidence value based on source type:
Source kindDefault confidence
Event0.8
Tool output0.9
Observation0.7
Outcome0.85
Working state1.0

Default half-life by memory type

The PolicyEngine assigns a decay half-life based on the classified memory type:
Memory typeDefault half-life
episodic1 hour (3 600 s)
working1 day (86 400 s)
semantic30 days (2 592 000 s)
competence30 days (2 592 000 s)
plan_graph30 days (2 592 000 s)

Build docs developers (and LLMs) love