Documentation Index
Fetch the complete documentation index at: https://mintlify.com/Neumenon/cowrie/llms.txt
Use this file to discover all available pages before exploring further.
Cowrie supports two streaming protocols for efficient record-by-record processing: Gen1 (simple length-prefixed) and Gen2 (master stream with metadata and checksums).
Gen1 Stream (Legacy)
Simple length-prefixed Cowrie records, similar to JSON Lines (JSONL).
[u32 length] [cowrie bytes]
[u32 length] [cowrie bytes]
...
[0x00 0x00 0x00 0x00] // optional end marker
- Length: 4-byte little-endian uint32
- Payload: Complete Cowrie v2 document (header + data)
- End Marker: Zero length signals end of stream (optional)
StreamWriter
import (
"os"
"github.com/Neumenon/cowrie/codec"
)
// Create writer
f, _ := os.Create("data.cowrie")
defer f.Close()
writer := codec.NewStreamWriter(f)
// Write records
for _, item := range items {
err := writer.Write(item)
if err != nil {
return err
}
}
// Flush if needed
writer.Sync()
StreamReader
// Read from file
data, _ := os.ReadFile("data.cowrie")
reader := codec.NewStreamReader(data)
// Read all records
for {
var item MyStruct
err := reader.Next(&item)
if err == io.EOF {
break
}
if err != nil {
return err
}
process(item)
}
// Or read all at once
items, err := codec.ReadAllStream[MyStruct](data)
Use Cases
- Log Shipping: Append-only event logs
- Message Queues: Kafka-like record streams
- Database Exports: Streaming table dumps
- API Pagination: Cursor-based result sets
Example: Event Log
type Event struct {
Timestamp int64
UserID string
Action string
Metadata map[string]any
}
// Write events
writer := codec.NewStreamWriter(logFile)
for _, event := range events {
writer.Write(event)
}
// Read events
reader := codec.NewStreamReader(logData)
for {
var event Event
if err := reader.Next(&event); err == io.EOF {
break
}
processEvent(event)
}
Gen2 Master Stream
Advanced streaming protocol with metadata, compression, checksums, and type routing.
Magic: 'S' 'J' 'S' 'T' (4 bytes)
Version: 0x02 (1 byte)
Flags: u8 (compression, CRC, deterministic, metadata)
HeaderLen: u16 LE (24 bytes for v2)
TypeID: u32 LE (schema fingerprint)
PayloadLen: u32 LE (compressed length)
RawLen: u32 LE (original length, 0 if not compressed)
MetaLen: u32 LE (metadata length, 0 if none)
[Metadata: cowrie bytes (if MetaLen > 0)]
[Payload: cowrie bytes or compressed]
[CRC32: u32 LE (if FlagMasterCRC set)]
Flags
| Bit | Flag | Meaning |
|---|
| 0 | Compressed | Payload is compressed |
| 1-2 | Compression type | 0=none, 1=gzip, 2=zstd |
| 3 | Deterministic | Keys sorted for reproducible encoding |
| 4 | CRC | CRC32 checksum follows payload |
| 5 | Meta | Metadata present before payload |
MasterWriter
import "github.com/Neumenon/cowrie/codec"
// Configure writer
opts := codec.DefaultMasterWriterOptions()
// opts.Deterministic = true (default)
// opts.Compression = cowrie.CompressionZstd (default)
// opts.EnableCRC = true (default)
writer := codec.NewMasterWriter(file, opts)
// Write with metadata
meta := cowrie.Object(
cowrie.Member{Key: "version", Value: cowrie.Int64(1)},
cowrie.Member{Key: "source", Value: cowrie.String("api")},
)
err := writer.WriteWithMeta(data, meta)
MasterReader
// Configure reader
opts := codec.DefaultMasterReaderOptions()
// opts.MaxDecompressedSize = 100 * 1024 * 1024 // 100MB default
// opts.AllowLegacy = true // Auto-detect Gen1 streams
reader := codec.NewMasterReader(data, opts)
// Read frames
for {
frame, err := reader.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
// Access frame data
fmt.Println("TypeID:", frame.TypeID)
fmt.Println("Compressed:", frame.Header.Compression)
// Access metadata (if present)
if frame.Meta != nil {
version := frame.Meta.Get("version").Int64()
}
// Access payload
payload := frame.Payload
processData(payload)
}
Example: Type Routing
// Define type handlers
handlers := map[uint32]func(*cowrie.Value) error{
12345: handleUserEvent,
67890: handleSystemEvent,
}
reader := codec.NewMasterReader(data, codec.MasterReaderOptions{
TypeHandlers: handlers,
})
for {
frame, err := reader.Next()
if err == io.EOF {
break
}
// Automatic dispatch based on TypeID
if handler, ok := handlers[frame.TypeID]; ok {
handler(frame.Payload)
}
}
Schema Fingerprinting
Type IDs are computed using FNV-1a hash of the value’s type structure:
value := cowrie.Object(
cowrie.Member{Key: "name", Value: cowrie.String("Alice")},
cowrie.Member{Key: "age", Value: cowrie.Int64(30)},
)
typeID := cowrie.SchemaFingerprint32(value) // Stable hash
Use cases:
- Type Routing: Dispatch to correct handler
- Schema Validation: Detect schema changes
- Multiplexing: Route records by type
- Schema Registry: Track schema evolution
Compression
Master stream automatically compresses payloads ≥ 256 bytes when enabled:
opts := codec.MasterWriterOptions{
Compression: cowrie.CompressionZstd, // or CompressionGzip
}
Compression is applied when:
- Payload ≥ 256 bytes
- Compressed size < original size
Security: Decompression is size-limited to prevent bombs:
opts := codec.MasterReaderOptions{
MaxDecompressedSize: 100 * 1024 * 1024, // 100MB default
}
CRC32 Checksums
Enable data integrity verification:
opts := codec.MasterWriterOptions{
EnableCRC: true, // default
}
CRC32 (IEEE polynomial) covers the entire frame from magic to payload. Reader automatically validates and returns ErrMasterCRCMismatch if corrupted.
Deterministic Encoding
Ensures reproducible output for content addressing:
opts := codec.MasterWriterOptions{
Deterministic: true, // default
}
- Object keys sorted lexicographically
- Same input → same output bytes
- Essential for caching, deduplication, Merkle trees
Comparison
| Feature | Gen1 Stream | Gen2 Master Stream |
|---|
| Overhead | 4 bytes | 24 bytes + optional metadata |
| Compression | No | Gzip, Zstd |
| Checksums | No | CRC32 |
| Metadata | No | Per-frame metadata |
| Type Routing | No | Schema fingerprinting |
| Legacy Support | N/A | Auto-detect Gen1 |
Column Readers (Advanced)
Gen2 supports columnar reading with column hints for efficient partial decoding.
HintCount: varint
For each hint:
Field: [len:varint][utf8 bytes]
Type: u8
ShapeLen: varint
ShapeDims: ShapeLen * varint
Flags: u8
Hints appear after header flags, before dictionary (only if FlagHasColumnHints set).
Use Cases
- Parquet-like Access: Read specific columns without full decode
- Query Push-down: Filter on indexed columns
- Partial Hydration: Load only needed fields
- Wide Tables: Skip unused columns in large records
Security Limits
opts := cowrie.DecodeOptions{
MaxHintCount: 10_000, // Max column hints
}
See Security Limits for details.
Gen1 Stream
- Batch Writes: Group small records to reduce syscalls
var buffer bytes.Buffer
writer := codec.NewStreamWriter(&buffer)
for _, item := range batch {
writer.Write(item)
}
file.Write(buffer.Bytes()) // Single write
- Streaming Read: Process records without loading entire file
// Good: Streaming
reader := codec.NewStreamReader(data)
for {
var item Item
if err := reader.Next(&item); err == io.EOF {
break
}
process(item) // Constant memory
}
// Avoid: Load all
items, _ := codec.ReadAllStream[Item](data) // Allocates array
- Zero-Copy Decoding: Reuse structs
var item Item
reader := codec.NewStreamReader(data)
for {
err := reader.Next(&item)
if err == io.EOF {
break
}
process(item)
// 'item' reused on next iteration
}
Gen2 Master Stream
- Enable Compression: 30-70% size reduction for mixed data
opts := codec.MasterWriterOptions{
Compression: cowrie.CompressionZstd,
}
- Deterministic for Dedup: Cache compressed frames by hash
opts := codec.MasterWriterOptions{
Deterministic: true,
Compression: cowrie.CompressionZstd,
}
frame, _ := encodeFrame(data, opts)
hash := sha256.Sum256(frame)
cache[hash] = frame // Deduplication
- Type-Based Routing: Avoid decoding unneeded frames
frame, _ := reader.Next()
if frame.TypeID == expectedTypeID {
processPayload(frame.Payload)
} else {
skipPayload(frame)
}
- Metadata for Filtering: Skip frames without full decode
for {
frame, _ := reader.Next()
// Check metadata first
if frame.Meta != nil {
partition := frame.Meta.Get("partition").Int64()
if partition != targetPartition {
continue // Skip without decoding payload
}
}
// Only decode relevant frames
process(frame.Payload)
}
Real-World Examples
Event Streaming (Gen1)
// Producer
func streamEvents(events []Event, w io.Writer) error {
writer := codec.NewStreamWriter(w)
for _, event := range events {
if err := writer.Write(event); err != nil {
return err
}
}
return writer.Sync()
}
// Consumer
func consumeEvents(r io.Reader) error {
data, _ := io.ReadAll(r)
reader := codec.NewStreamReader(data)
for {
var event Event
err := reader.Next(&event)
if err == io.EOF {
break
}
if err != nil {
return err
}
handleEvent(event)
}
return nil
}
Multi-Tenant Data Stream (Gen2)
// Write with tenant metadata
opts := codec.DefaultMasterWriterOptions()
writer := codec.NewMasterWriter(stream, opts)
for _, record := range records {
meta := cowrie.Object(
cowrie.Member{Key: "tenant", Value: cowrie.String(record.TenantID)},
cowrie.Member{Key: "timestamp", Value: cowrie.Int64(time.Now().Unix())},
)
writer.WriteWithMeta(record.Data, meta)
}
// Read filtered by tenant
reader := codec.NewMasterReader(data, codec.DefaultMasterReaderOptions())
targetTenant := "acme_corp"
for {
frame, err := reader.Next()
if err == io.EOF {
break
}
// Filter by metadata (no payload decode!)
if frame.Meta != nil {
tenant := frame.Meta.Get("tenant").String()
if tenant != targetTenant {
continue
}
}
// Process only relevant tenant's data
process(frame.Payload)
}
Error Handling
Gen1 Stream Errors
reader := codec.NewStreamReader(data)
for {
var item Item
err := reader.Next(&item)
if err == io.EOF {
break // Normal end
}
if err != nil {
// Truncated frame, decode error, etc.
log.Printf("Stream error at position %d: %v", reader.Position(), err)
return err
}
}
Gen2 Master Stream Errors
reader := codec.NewMasterReader(data, opts)
for {
frame, err := reader.Next()
switch err {
case nil:
// Success
case io.EOF:
// Normal end
case codec.ErrMasterCRCMismatch:
// Corrupted frame
log.Printf("CRC mismatch at offset %d", reader.Position())
case codec.ErrMasterTruncated:
// Incomplete frame
log.Printf("Truncated frame at offset %d", reader.Position())
case cowrie.ErrDecompressedTooLarge:
// Decompression bomb detected
log.Printf("Suspicious payload at offset %d", reader.Position())
default:
// Other errors
return err
}
}