Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/Neumenon/cowrie/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Gen1’s StreamDecoder provides safe, efficient streaming decoding for any io.Reader. It properly handles record boundaries and unconsumed bytes, working correctly with network connections, compressed streams, and other non-seekable readers.

StreamDecoder

Streaming decoder that maintains an internal buffer to handle incomplete records.
type StreamDecoder struct {
    // Internal fields (not exported)
}

NewStreamDecoder

Creates a new streaming decoder for the given reader.
func NewStreamDecoder(r io.Reader) *StreamDecoder
r
io.Reader
required
Any reader providing Gen1 Cowrie binary data. Supports seekable (*os.File, *bytes.Reader) and non-seekable (net.Conn, gzip.Reader, http.Response.Body) readers.
*StreamDecoder
Decoder instance ready to decode values

Example

conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

dec := gen1.NewStreamDecoder(conn)

Decode

Reads and decodes one value from the stream.
func (d *StreamDecoder) Decode() (any, error)
any
Decoded Go value. Same types as gen1.Decode():
  • Primitives: nil, bool, int64, float64, string, []byte
  • Containers: map[string]any, []any
  • Typed arrays: []int64, []float64, []string
  • Graph types: Node, Edge, AdjList, NodeBatch, EdgeBatch, GraphShard
error
  • io.EOF: No more data available (clean stream end)
  • errors.New("unexpected EOF: incomplete record"): Stream ended mid-record
  • Other errors: Malformed data or security limit exceeded

Example: Basic Streaming

f, err := os.Open("data.cowrie")
if err != nil {
    log.Fatal(err)
}
defer f.Close()

dec := gen1.NewStreamDecoder(f)

for {
    value, err := dec.Decode()
    if err == io.EOF {
        break // Clean end of stream
    }
    if err != nil {
        log.Fatal(err) // Decode error
    }
    
    // Process value
    obj := value.(map[string]any)
    fmt.Printf("ID: %s\n", obj["id"])
}

Example: Network Stream

conn, err := net.Dial("tcp", "api.example.com:9000")
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

dec := gen1.NewStreamDecoder(conn)

for {
    record, err := dec.Decode()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Printf("Decode error: %v", err)
        break
    }
    
    // Process streaming record
    processRecord(record)
}

Example: Compressed Stream

f, _ := os.Open("archive.cowrie.gz")
defer f.Close()

gzr, err := gzip.NewReader(f)
if err != nil {
    log.Fatal(err)
}
defer gzr.Close()

dec := gen1.NewStreamDecoder(gzr)

for {
    value, err := dec.Decode()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    
    processValue(value)
}

Example: HTTP Response

resp, err := http.Get("https://api.example.com/stream")
if err != nil {
    log.Fatal(err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
    log.Fatalf("HTTP %d", resp.StatusCode)
}

dec := gen1.NewStreamDecoder(resp.Body)

for {
    event, err := dec.Decode()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Printf("Stream error: %v", err)
        break
    }
    
    handleEvent(event)
}

Error Handling

EOF Handling

The decoder distinguishes between clean stream end and truncated data:
for {
    value, err := dec.Decode()
    if err == io.EOF {
        // Clean end: no more complete records
        fmt.Println("Stream complete")
        break
    }
    if err != nil {
        // Decode error: malformed data or incomplete record
        if strings.Contains(err.Error(), "unexpected EOF") {
            log.Println("Stream truncated mid-record")
        } else {
            log.Printf("Decode error: %v", err)
        }
        break
    }
    
    process(value)
}

Incomplete Records

The decoder buffers incomplete data automatically:
// Reader returns partial data
conn.Read() // Returns 50 bytes of a 100-byte record

// First Decode() call
value, err := dec.Decode()
// err = nil (internally buffers 50 bytes, reads more data)

// Decoder continues reading until complete record is available

Security Limits

Streaming decode respects the same security limits as Decode():
dec := gen1.NewStreamDecoder(untrustedConn)

for {
    value, err := dec.Decode()
    if err == gen1.ErrMaxArrayLen {
        log.Println("Malicious array size detected")
        conn.Close()
        break
    }
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    
    process(value)
}

Performance Characteristics

Buffer Management

  • Initial buffer: 4KB
  • Grows automatically for larger records
  • Efficiently handles unconsumed bytes between records
  • No data loss on partial reads

Memory Efficiency

The decoder maintains a small buffer (typically 4-8KB) even when processing large datasets:
// Efficiently processes millions of records
for {
    record, err := dec.Decode()
    if err == io.EOF {
        break
    }
    
    // Process and discard - only one record in memory
    saveToDatabase(record)
}

Best Practices

1. Always Check io.EOF

// Good: Explicit EOF check
for {
    value, err := dec.Decode()
    if err == io.EOF {
        break // Expected end
    }
    if err != nil {
        return err // Unexpected error
    }
    process(value)
}

// Bad: Treating EOF as error
for {
    value, err := dec.Decode()
    if err != nil {
        return err // Wrong: EOF is not an error
    }
    process(value)
}

2. Reuse Decoder for Multiple Records

// Good: One decoder for entire stream
dec := gen1.NewStreamDecoder(conn)
for {
    value, err := dec.Decode()
    // ...
}

// Bad: Creating new decoder per record (loses buffered data)
for {
    dec := gen1.NewStreamDecoder(conn) // Wrong!
    value, err := dec.Decode()
    // ...
}

3. Handle Partial Reads Gracefully

The decoder handles partial reads automatically - no special handling needed:
// Network with variable packet sizes
dec := gen1.NewStreamDecoder(conn)

// Decoder buffers partial data transparently
for {
    value, err := dec.Decode()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    process(value)
}

4. Combine with Context for Timeouts

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

type result struct {
    value any
    err   error
}

ch := make(chan result)
go func() {
    value, err := dec.Decode()
    ch <- result{value, err}
}()

select {
case res := <-ch:
    if res.err != nil {
        log.Fatal(res.err)
    }
    process(res.value)
case <-ctx.Done():
    log.Println("Decode timeout")
}

Comparison with Non-Streaming Decode

Non-Streaming (Decode)

Requires entire message in memory:
// Must read entire record first
data, err := io.ReadAll(conn)
if err != nil {
    log.Fatal(err)
}

value, err := gen1.Decode(data)
if err != nil {
    log.Fatal(err)
}
Use when:
  • Data already in memory (buffers, HTTP responses)
  • Small, fixed-size messages
  • Random access needed

Streaming (StreamDecoder)

Processes data incrementally:
// Processes records as they arrive
dec := gen1.NewStreamDecoder(conn)

for {
    value, err := dec.Decode()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    process(value)
}
Use when:
  • Reading from network connections
  • Processing large files
  • Handling multiple records in sequence
  • Working with compressed streams
  • Memory efficiency is important

Common Patterns

Pattern: Streaming Pipeline

func processCowrieStream(r io.Reader) error {
    dec := gen1.NewStreamDecoder(r)
    
    for {
        value, err := dec.Decode()
        if err == io.EOF {
            return nil // Success
        }
        if err != nil {
            return fmt.Errorf("decode: %w", err)
        }
        
        if err := processRecord(value); err != nil {
            return fmt.Errorf("process: %w", err)
        }
    }
}

Pattern: Count Records

func countRecords(r io.Reader) (int, error) {
    dec := gen1.NewStreamDecoder(r)
    count := 0
    
    for {
        _, err := dec.Decode()
        if err == io.EOF {
            return count, nil
        }
        if err != nil {
            return 0, err
        }
        count++
    }
}

Pattern: Batch Processing

func processBatches(r io.Reader, batchSize int) error {
    dec := gen1.NewStreamDecoder(r)
    batch := make([]any, 0, batchSize)
    
    for {
        value, err := dec.Decode()
        if err == io.EOF {
            if len(batch) > 0 {
                processBatch(batch) // Process final partial batch
            }
            return nil
        }
        if err != nil {
            return err
        }
        
        batch = append(batch, value)
        if len(batch) >= batchSize {
            if err := processBatch(batch); err != nil {
                return err
            }
            batch = batch[:0] // Reuse slice
        }
    }
}

Pattern: Filter Stream

func filterStream(r io.Reader, w io.Writer, predicate func(any) bool) error {
    dec := gen1.NewStreamDecoder(r)
    
    for {
        value, err := dec.Decode()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        
        if predicate(value) {
            if err := gen1.EncodeTo(w, value); err != nil {
                return err
            }
        }
    }
}

Build docs developers (and LLMs) love