Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/Neumenon/cowrie/llms.txt

Use this file to discover all available pages before exploring further.

Overview

GraphShard is a self-contained subgraph type that packages nodes, edges, and metadata into a single Cowrie value. Designed for:
  • GNN Mini-Batch Checkpointing: Save/restore training state with full subgraph context
  • Distributed Graph Processing: Partition large graphs across workers (e.g., GraphX, Pregel)
  • Graph Database Snapshots: Export/import complete subgraphs with metadata
  • Streaming Graph Partitions: Send graph fragments over network with provenance
GraphShard combines nodes, edges, and metadata in a single atomic unit, ensuring consistency during distributed operations.

Wire Format

Gen1 Format (Tag 0x15)

Tag(0x15) | nodeCount:varint | Node* | edgeCount:varint | Edge* | metaCount:varint | (keyLen:varint | keyBytes | value)*
Structure:
  1. Node count followed by Gen1 Node elements (tag 0x10)
  2. Edge count followed by Gen1 Edge elements (tag 0x11)
  3. Metadata as inline key-value pairs (like Object)

Gen2 Format (Tag 0x39)

Tag(0x39) | nodeCount:varint | Node* | edgeCount:varint | Edge* | metaCount:varint | (dictIdx:varint | value)*
Structure:
  1. Node count followed by Gen2 Node elements (tag 0x35)
  2. Edge count followed by Gen2 Edge elements (tag 0x36)
  3. Metadata with dictionary-coded keys (shared with nodes/edges)
All property keys (node props, edge props, metadata) share the same dictionary in Gen2, maximizing compression.

Logical Structure

A GraphShard conceptually represents:
{
  "nodes": [
    {"id": "1", "labels": ["Node"], "props": {"x": 0.1}},
    {"id": "2", "labels": ["Node"], "props": {"x": 0.2}}
  ],
  "edges": [
    {"from": "1", "to": "2", "type": "EDGE", "props": {"weight": 0.85}}
  ],
  "metadata": {
    "version": 1,
    "partitionId": 42,
    "timestamp": 1677649200,
    "source": "graph_db_snapshot"
  }
}

Use Cases

1. GNN Mini-Batch Checkpointing

Save complete training batch with neighborhood context:
package main

import (
    "bytes"
    "github.com/Neumenon/cowrie"
    "github.com/Neumenon/cowrie/graph"
)

func createGNNCheckpoint(nodes []*graph.NodeEvent, edges []*graph.EdgeEvent, epoch int, batchID int) []byte {
    var buf bytes.Buffer
    sw := graph.NewStreamWriter(&buf)
    
    // Pre-populate common keys
    sw.Header().AddLabel("Node")
    sw.Header().AddLabel("EDGE")
    sw.Header().AddField("features")
    sw.Header().AddField("label")
    sw.Header().AddField("weight")
    sw.Header().AddField("epoch")
    sw.Header().AddField("batchId")
    sw.Header().AddField("timestamp")
    
    sw.WriteHeader()
    
    // Write nodes (mini-batch)
    for _, node := range nodes {
        sw.WriteNode(node)
    }
    
    // Write edges (neighborhood)
    for _, edge := range edges {
        sw.WriteEdge(edge)
    }
    
    // Write checkpoint metadata
    // Note: GraphShard metadata is conceptual; in streaming format,
    // metadata is stored in stream header or as special events
    
    sw.Close()
    return buf.Bytes()
}

func restoreGNNCheckpoint(data []byte) ([]*graph.NodeEvent, []*graph.EdgeEvent, error) {
    sr, err := graph.NewStreamReader(data)
    if err != nil {
        return nil, nil, err
    }
    
    var nodes []*graph.NodeEvent
    var edges []*graph.EdgeEvent
    
    for {
        evt, err := sr.Next()
        if err != nil || evt == nil {
            break
        }
        
        switch evt.Kind {
        case graph.EventNode:
            nodes = append(nodes, evt.Node)
        case graph.EventEdge:
            edges = append(edges, evt.Edge)
        }
    }
    
    return nodes, edges, nil
}

2. Distributed Graph Partitioning

Partition large graph across workers:
type GraphPartition struct {
    ID        int
    NodeIDs   []string
    Boundary  map[string][]string // Cross-partition edges
}

func partitionGraph(g *Graph, numPartitions int) []*GraphPartition {
    // Use METIS, spectral clustering, etc.
    return metisPartition(g, numPartitions)
}

func createShardForPartition(g *Graph, partition *GraphPartition) []byte {
    var buf bytes.Buffer
    sw := graph.NewStreamWriter(&buf)
    
    sw.Header().AddLabel("Node")
    sw.Header().AddLabel("EDGE")
    sw.Header().AddField("partitionId")
    sw.Header().AddField("nodeCount")
    
    sw.WriteHeader()
    
    // Write partition nodes
    for _, nodeID := range partition.NodeIDs {
        node := g.GetNode(nodeID)
        sw.WriteNode(&graph.NodeEvent{
            Op:     graph.OpUpsert,
            ID:     nodeID,
            Labels: node.Labels,
            Props:  node.Props,
        })
    }
    
    // Write internal edges
    for _, nodeID := range partition.NodeIDs {
        for _, edge := range g.GetOutgoingEdges(nodeID) {
            // Only include edges within partition
            if contains(partition.NodeIDs, edge.To) {
                sw.WriteEdge(&graph.EdgeEvent{
                    Op:     graph.OpUpsert,
                    Label:  edge.Label,
                    FromID: edge.From,
                    ToID:   edge.To,
                    Props:  edge.Props,
                })
            }
        }
    }
    
    sw.Close()
    return buf.Bytes()
}

// Distribute shards to workers
func distributeGraph(g *Graph, workers []string) {
    partitions := partitionGraph(g, len(workers))
    
    for i, partition := range partitions {
        shardData := createShardForPartition(g, partition)
        sendToWorker(workers[i], shardData)
    }
}

3. Graph Database Snapshot

Export complete subgraph with provenance:
func snapshotSubgraph(db *GraphDB, rootID string, depth int) []byte {
    var buf bytes.Buffer
    sw := graph.NewStreamWriter(&buf)
    
    // Collect subgraph via BFS/DFS
    nodes, edges := traverseSubgraph(db, rootID, depth)
    
    sw.Header().AddField("snapshotTime")
    sw.Header().AddField("rootNode")
    sw.Header().AddField("depth")
    sw.Header().AddField("nodeCount")
    sw.Header().AddField("edgeCount")
    
    sw.WriteHeader()
    
    // Write nodes
    for _, node := range nodes {
        sw.WriteNode(node)
    }
    
    // Write edges
    for _, edge := range edges {
        sw.WriteEdge(edge)
    }
    
    sw.Close()
    return buf.Bytes()
}

func restoreSubgraph(db *GraphDB, shardData []byte) error {
    sr, err := graph.NewStreamReader(shardData)
    if err != nil {
        return err
    }
    
    tx := db.BeginTx()
    defer tx.Rollback()
    
    events, err := sr.ReadAll()
    if err != nil {
        return err
    }
    
    for _, evt := range events {
        switch evt.Kind {
        case graph.EventNode:
            if err := tx.CreateNode(evt.Node); err != nil {
                return err
            }
        case graph.EventEdge:
            if err := tx.CreateEdge(evt.Edge); err != nil {
                return err
            }
        }
    }
    
    return tx.Commit()
}

4. Streaming Graph Analytics

Process graph in partitioned chunks:
type GraphAnalyzer struct {
    workers   int
    batchSize int
}

func (a *GraphAnalyzer) PageRank(g *Graph) map[string]float64 {
    // Partition graph
    partitions := partitionGraph(g, a.workers)
    
    // Create shards
    shards := make([][]byte, len(partitions))
    for i, partition := range partitions {
        shards[i] = createShardForPartition(g, partition)
    }
    
    // Distribute to workers
    results := make(chan map[string]float64, a.workers)
    for _, shard := range shards {
        go func(data []byte) {
            results <- computeLocalPageRank(data)
        }(shard)
    }
    
    // Aggregate results
    globalScores := make(map[string]float64)
    for i := 0; i < a.workers; i++ {
        localScores := <-results
        for id, score := range localScores {
            globalScores[id] += score
        }
    }
    
    return globalScores
}

func computeLocalPageRank(shardData []byte) map[string]float64 {
    sr, _ := graph.NewStreamReader(shardData)
    
    // Load shard into memory
    nodes := make(map[string]*graph.NodeEvent)
    edges := make([]*graph.EdgeEvent, 0)
    
    for {
        evt, err := sr.Next()
        if err != nil || evt == nil {
            break
        }
        
        if evt.Kind == graph.EventNode {
            nodes[evt.Node.ID] = evt.Node
        } else if evt.Kind == graph.EventEdge {
            edges = append(edges, evt.Edge)
        }
    }
    
    // Run local PageRank iterations
    return runPageRank(nodes, edges)
}

Metadata Fields

Common metadata fields for GraphShards:
// Partition metadata
metadata := map[string]any{
    "partitionId":    42,
    "totalPartitions": 100,
    "nodeCount":      1000,
    "edgeCount":      5000,
}

// Snapshot metadata
metadata := map[string]any{
    "version":       "v1.2.3",
    "snapshotTime":  time.Now().Unix(),
    "rootNode":      "alice",
    "depth":         3,
    "source":        "neo4j",
}

// GNN checkpoint metadata
metadata := map[string]any{
    "epoch":         10,
    "batchId":       42,
    "loss":          0.123,
    "accuracy":      0.89,
    "timestamp":     time.Now().Unix(),
    "modelVersion":  "gnn_v2",
}

// Streaming analytics metadata
metadata := map[string]any{
    "windowStart":   startTime,
    "windowEnd":     endTime,
    "algorithm":     "pagerank",
    "iterations":    20,
    "convergence":   0.0001,
}

Complete Example: Graph Partitioning System

package main

import (
    "bytes"
    "fmt"
    "sync"
    "github.com/Neumenon/cowrie/graph"
)

// GraphShardManager manages distributed graph processing
type GraphShardManager struct {
    graph      *Graph
    numShards  int
    shards     [][]byte
}

func NewGraphShardManager(g *Graph, numShards int) *GraphShardManager {
    return &GraphShardManager{
        graph:     g,
        numShards: numShards,
    }
}

func (m *GraphShardManager) CreateShards() error {
    // Partition graph
    partitions := partitionGraph(m.graph, m.numShards)
    m.shards = make([][]byte, len(partitions))
    
    // Create shard for each partition
    for i, partition := range partitions {
        var buf bytes.Buffer
        sw := graph.NewStreamWriter(&buf)
        
        // Add common labels/fields
        sw.Header().AddLabel("Node")
        sw.Header().AddLabel("EDGE")
        sw.Header().AddField("partitionId")
        sw.Header().AddField("nodeCount")
        sw.Header().AddField("edgeCount")
        
        sw.WriteHeader()
        
        nodeCount := 0
        edgeCount := 0
        
        // Write nodes in partition
        for _, nodeID := range partition.NodeIDs {
            node := m.graph.GetNode(nodeID)
            sw.WriteNode(&graph.NodeEvent{
                Op:     graph.OpUpsert,
                ID:     nodeID,
                Labels: node.Labels,
                Props:  node.Props,
            })
            nodeCount++
        }
        
        // Write internal edges
        for _, nodeID := range partition.NodeIDs {
            for _, edge := range m.graph.GetOutgoingEdges(nodeID) {
                if contains(partition.NodeIDs, edge.To) {
                    sw.WriteEdge(&graph.EdgeEvent{
                        Op:     graph.OpUpsert,
                        Label:  edge.Label,
                        FromID: edge.From,
                        ToID:   edge.To,
                        Props:  edge.Props,
                    })
                    edgeCount++
                }
            }
        }
        
        sw.Close()
        m.shards[i] = buf.Bytes()
        
        fmt.Printf("Shard %d: %d nodes, %d edges, %d bytes\n",
            i, nodeCount, edgeCount, len(buf.Bytes()))
    }
    
    return nil
}

func (m *GraphShardManager) ProcessInParallel(fn func([]byte) map[string]float64) map[string]float64 {
    var wg sync.WaitGroup
    resultsChan := make(chan map[string]float64, m.numShards)
    
    // Process each shard in parallel
    for _, shard := range m.shards {
        wg.Add(1)
        go func(data []byte) {
            defer wg.Done()
            result := fn(data)
            resultsChan <- result
        }(shard)
    }
    
    // Wait and close channel
    go func() {
        wg.Wait()
        close(resultsChan)
    }()
    
    // Aggregate results
    globalResult := make(map[string]float64)
    for localResult := range resultsChan {
        for id, value := range localResult {
            globalResult[id] += value
        }
    }
    
    return globalResult
}

func main() {
    // Load large graph
    g := LoadGraph("large_graph.json")
    
    // Create shard manager
    manager := NewGraphShardManager(g, 8)
    manager.CreateShards()
    
    // Run PageRank in parallel across shards
    scores := manager.ProcessInParallel(func(shardData []byte) map[string]float64 {
        return computeLocalPageRank(shardData)
    })
    
    // Print top nodes
    topNodes := getTopK(scores, 10)
    for i, node := range topNodes {
        fmt.Printf("%d. %s: %.6f\n", i+1, node.ID, node.Score)
    }
}

Performance Characteristics

Shard Size Guidelines

Graph SizeNodes per ShardEdges per ShardMemory per Shard
Small (1K-10K)100-1K1K-10K10-100 KB
Medium (10K-1M)1K-10K10K-100K100KB-1MB
Large (1M-100M)10K-100K100K-1M1-10 MB
Very Large (100M+)100K-1M1M-10M10-100 MB

Dictionary Compression

GraphShard benefits significantly from dictionary coding:
Without dictionary (Gen1):
- 10K nodes × 50 bytes/node = 500KB
- 50K edges × 40 bytes/edge = 2MB
- Total: 2.5MB

With dictionary (Gen2):
- Dictionary: 100 keys × 20 bytes = 2KB
- 10K nodes × 10 bytes/node = 100KB
- 50K edges × 8 bytes/edge = 400KB
- Total: 502KB (80% reduction!)

Parallelization Efficiency

// Measure speedup with parallel processing
func benchmarkParallelShards(g *Graph) {
    for numShards := 1; numShards <= 16; numShards *= 2 {
        start := time.Now()
        
        manager := NewGraphShardManager(g, numShards)
        manager.CreateShards()
        manager.ProcessInParallel(computeLocalPageRank)
        
        elapsed := time.Since(start)
        fmt.Printf("Shards: %d, Time: %v\n", numShards, elapsed)
    }
}

// Typical results:
// Shards: 1,  Time: 10.5s  (baseline)
// Shards: 2,  Time: 5.8s   (1.8× speedup)
// Shards: 4,  Time: 3.2s   (3.3× speedup)
// Shards: 8,  Time: 1.9s   (5.5× speedup)
// Shards: 16, Time: 1.4s   (7.5× speedup)

Type Tags Reference

FormatTagNameStructure
Gen10x15GraphShardnodes + edges + metadata (inline keys)
Gen20x39GraphShardnodes + edges + metadata (dict-coded)

Boundary Edges

Handle cross-shard edges:
type ShardWithBoundary struct {
    InternalNodes []string
    InternalEdges []*graph.EdgeEvent
    BoundaryEdges []*graph.EdgeEvent  // Edges to other shards
}

func createShardWithBoundary(g *Graph, partition *GraphPartition) []byte {
    var buf bytes.Buffer
    sw := graph.NewStreamWriter(&buf)
    
    sw.WriteHeader()
    
    // Write internal nodes
    for _, nodeID := range partition.NodeIDs {
        // ... write node
    }
    
    // Write internal edges
    for _, edge := range getInternalEdges(partition) {
        sw.WriteEdge(edge)
    }
    
    // Write boundary edges (marked with metadata)
    for _, edge := range getBoundaryEdges(partition) {
        edge.Props["boundary"] = true
        edge.Props["targetShard"] = getTargetShard(edge.To)
        sw.WriteEdge(edge)
    }
    
    sw.Close()
    return buf.Bytes()
}
For distributed graph algorithms (PageRank, connected components), track boundary edges to enable cross-shard communication.

Error Handling

func loadShard(data []byte) error {
    sr, err := graph.NewStreamReader(data)
    if err != nil {
        return fmt.Errorf("failed to parse shard: %w", err)
    }
    
    nodeCount := 0
    edgeCount := 0
    
    for {
        evt, err := sr.Next()
        if err != nil {
            return fmt.Errorf("error reading event %d: %w", 
                nodeCount+edgeCount, err)
        }
        if evt == nil {
            break
        }
        
        switch evt.Kind {
        case graph.EventNode:
            nodeCount++
        case graph.EventEdge:
            edgeCount++
        }
    }
    
    fmt.Printf("Loaded shard: %d nodes, %d edges\n", nodeCount, edgeCount)
    return nil
}
Always validate shard integrity after deserialization. Check that all edge endpoints exist in the shard’s node set for internal edges.

Build docs developers (and LLMs) love