Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt

Use this file to discover all available pages before exploring further.

Platformatic Job Queue uses serialization to convert payloads and results to/from binary format for storage. By default, JSON serialization is used, but you can implement custom serializers for better performance or to support non-JSON-serializable types.

Serde Interface

The Serde<T> interface defines the contract for serializers.
interface Serde<T> {
  serialize(value: T): Buffer
  deserialize(buffer: Buffer): T
}
serialize
(value: T) => Buffer
required
Converts a value to a Buffer for storage
deserialize
(buffer: Buffer) => T
required
Converts a Buffer back to the original value

Built-in Serializers

JsonSerde

Default JSON-based serializer.
import { JsonSerde, createJsonSerde } from '@platformatic/job-queue'

// Using the class directly
const serde = new JsonSerde<MyPayload>();

// Using the factory function
const serde = createJsonSerde<MyPayload>();

// Explicit configuration (JSON is the default)
const queue = new Queue({
  storage,
  payloadSerde: createJsonSerde(),
  resultSerde: createJsonSerde()
});
Limitations:
  • Cannot serialize functions, symbols, or circular references
  • No support for special types like Date, RegExp, Map, Set (they’ll be converted to plain objects)
  • Less efficient for binary data

Custom Serializers

MessagePack Example

MessagePack is a binary serialization format that’s more compact than JSON.
import { Serde } from '@platformatic/job-queue'
import * as msgpack from '@msgpack/msgpack'

class MessagePackSerde<T> implements Serde<T> {
  serialize(value: T): Buffer {
    const encoded = msgpack.encode(value);
    return Buffer.from(encoded);
  }

  deserialize(buffer: Buffer): T {
    return msgpack.decode(buffer) as T;
  }
}

// Use with queue
const queue = new Queue({
  storage,
  payloadSerde: new MessagePackSerde(),
  resultSerde: new MessagePackSerde()
});
Benefits:
  • Smaller payload size (20-50% smaller than JSON)
  • Faster serialization/deserialization
  • Supports binary data
Install:
npm install @msgpack/msgpack

Protocol Buffers Example

Protocol Buffers provide strongly-typed, efficient serialization.
import { Serde } from '@platformatic/job-queue'
import { Message } from './generated/proto'

class ProtobufSerde<T extends Message> implements Serde<T> {
  constructor(private MessageClass: new () => T) {}

  serialize(value: T): Buffer {
    return Buffer.from(value.serializeBinary());
  }

  deserialize(buffer: Buffer): T {
    const message = new this.MessageClass();
    message.deserializeBinary(new Uint8Array(buffer));
    return message;
  }
}

// Define your proto schema (job.proto)
// message JobPayload {
//   string user_id = 1;
//   string action = 2;
// }
//
// message JobResult {
//   bool success = 1;
//   string message = 2;
// }

import { JobPayload, JobResult } from './generated/job_pb'

const queue = new Queue<JobPayload, JobResult>({
  storage,
  payloadSerde: new ProtobufSerde(JobPayload),
  resultSerde: new ProtobufSerde(JobResult)
});

// Enqueue with typed payload
const payload = new JobPayload();
payload.setUserId('user-123');
payload.setAction('send-email');

await queue.enqueue(payload);
Benefits:
  • Smallest payload size
  • Strong typing
  • Schema evolution support
  • Language-agnostic

Custom Binary Format

Implement your own binary format for specific use cases.
import { Serde } from '@platformatic/job-queue'

interface ImagePayload {
  id: string
  format: 'jpeg' | 'png' | 'webp'
  data: Buffer
  width: number
  height: number
}

class ImageSerde implements Serde<ImagePayload> {
  serialize(value: ImagePayload): Buffer {
    // Custom binary format:
    // [id length: 2 bytes][id: variable]
    // [format: 1 byte][width: 4 bytes][height: 4 bytes]
    // [data length: 4 bytes][data: variable]
    
    const idBuffer = Buffer.from(value.id, 'utf8');
    const formatByte = value.format === 'jpeg' ? 0 : 
                       value.format === 'png' ? 1 : 2;
    
    const header = Buffer.alloc(15);
    header.writeUInt16BE(idBuffer.length, 0);
    header.writeUInt8(formatByte, 2);
    header.writeUInt32BE(value.width, 3);
    header.writeUInt32BE(value.height, 7);
    header.writeUInt32BE(value.data.length, 11);
    
    return Buffer.concat([header, idBuffer, value.data]);
  }

  deserialize(buffer: Buffer): ImagePayload {
    const idLength = buffer.readUInt16BE(0);
    const formatByte = buffer.readUInt8(2);
    const width = buffer.readUInt32BE(3);
    const height = buffer.readUInt32BE(7);
    const dataLength = buffer.readUInt32BE(11);
    
    const id = buffer.toString('utf8', 15, 15 + idLength);
    const format = formatByte === 0 ? 'jpeg' : 
                   formatByte === 1 ? 'png' : 'webp';
    const data = buffer.slice(15 + idLength, 15 + idLength + dataLength);
    
    return { id, format, data, width, height };
  }
}

const queue = new Queue<ImagePayload, void>({
  storage,
  payloadSerde: new ImageSerde()
});

queue.process(async (job) => {
  const { id, format, data, width, height } = job.payload;
  await processImage(data, format, width, height);
});

Compression Example

Wrap serializers with compression for large payloads.
import { Serde } from '@platformatic/job-queue'
import { gzipSync, gunzipSync } from 'zlib'

class CompressedSerde<T> implements Serde<T> {
  constructor(private innerSerde: Serde<T>) {}

  serialize(value: T): Buffer {
    const serialized = this.innerSerde.serialize(value);
    return gzipSync(serialized);
  }

  deserialize(buffer: Buffer): T {
    const decompressed = gunzipSync(buffer);
    return this.innerSerde.deserialize(decompressed);
  }
}

// Wrap JSON serializer with compression
const queue = new Queue({
  storage,
  payloadSerde: new CompressedSerde(createJsonSerde()),
  resultSerde: new CompressedSerde(createJsonSerde())
});

Encryption Example

Encrypt sensitive data in payloads and results.
import { Serde } from '@platformatic/job-queue'
import { createCipheriv, createDecipheriv, randomBytes } from 'crypto'

class EncryptedSerde<T> implements Serde<T> {
  constructor(
    private innerSerde: Serde<T>,
    private key: Buffer // 32 bytes for AES-256
  ) {}

  serialize(value: T): Buffer {
    const serialized = this.innerSerde.serialize(value);
    const iv = randomBytes(16);
    const cipher = createCipheriv('aes-256-cbc', this.key, iv);
    
    const encrypted = Buffer.concat([
      cipher.update(serialized),
      cipher.final()
    ]);
    
    // Prepend IV for decryption
    return Buffer.concat([iv, encrypted]);
  }

  deserialize(buffer: Buffer): T {
    const iv = buffer.slice(0, 16);
    const encrypted = buffer.slice(16);
    
    const decipher = createDecipheriv('aes-256-cbc', this.key, iv);
    const decrypted = Buffer.concat([
      decipher.update(encrypted),
      decipher.final()
    ]);
    
    return this.innerSerde.deserialize(decrypted);
  }
}

// Use with sensitive data
const encryptionKey = Buffer.from(process.env.ENCRYPTION_KEY!, 'hex');

const queue = new Queue({
  storage,
  payloadSerde: new EncryptedSerde(
    createJsonSerde(),
    encryptionKey
  )
});

// Payloads are now encrypted in storage
await queue.enqueue({
  creditCard: '4111-1111-1111-1111',
  ssn: '123-45-6789'
});

Separate Payload and Result Serializers

Use different serializers for payloads and results.
import { MessagePackSerde } from './msgpack-serde'
import { createJsonSerde } from '@platformatic/job-queue'

interface BinaryPayload {
  imageData: Buffer
  metadata: Record<string, any>
}

interface SimpleResult {
  success: boolean
  url: string
}

const queue = new Queue<BinaryPayload, SimpleResult>({
  storage,
  // Use MessagePack for binary payload
  payloadSerde: new MessagePackSerde(),
  // Use JSON for simple result
  resultSerde: createJsonSerde()
});

queue.process(async (job) => {
  const { imageData, metadata } = job.payload;
  const url = await uploadImage(imageData, metadata);
  return { success: true, url };
});

const result = await queue.enqueueAndWait(
  {
    imageData: Buffer.from('...'),
    metadata: { userId: '123' }
  },
  { timeout: 30000 }
);

console.log('Uploaded to:', result.url);

Performance Considerations

Benchmarking Serializers

import { performance } from 'perf_hooks'

function benchmarkSerde<T>(serde: Serde<T>, value: T, iterations = 10000) {
  // Serialize benchmark
  const serializeStart = performance.now();
  let totalSize = 0;
  
  for (let i = 0; i < iterations; i++) {
    const buffer = serde.serialize(value);
    totalSize += buffer.length;
  }
  
  const serializeTime = performance.now() - serializeStart;
  
  // Deserialize benchmark
  const buffer = serde.serialize(value);
  const deserializeStart = performance.now();
  
  for (let i = 0; i < iterations; i++) {
    serde.deserialize(buffer);
  }
  
  const deserializeTime = performance.now() - deserializeStart;
  
  return {
    serializeMs: serializeTime,
    deserializeMs: deserializeTime,
    avgSize: totalSize / iterations,
    serializeOpsPerSec: (iterations / serializeTime) * 1000,
    deserializeOpsPerSec: (iterations / deserializeTime) * 1000
  };
}

const payload = {
  id: 'job-123',
  data: { /* complex object */ },
  timestamp: Date.now()
};

console.log('JSON:', benchmarkSerde(createJsonSerde(), payload));
console.log('MessagePack:', benchmarkSerde(new MessagePackSerde(), payload));

Choosing a Serializer

SerializerSizeSpeedUse Case
JSONLargeMediumSimple objects, debugging
MessagePackSmallFastGeneral purpose, binary data
Protocol BuffersSmallestFastestHigh throughput, typed schemas
Custom BinarySmallestFastestSpecialized data formats
Compressed JSONVery SmallSlowLarge text payloads
EncryptedSame as innerSlowestSensitive data
Recommendations:
  • Development: Use JSON for easy debugging
  • Production: Use MessagePack for better performance
  • High scale: Use Protocol Buffers or custom binary
  • Large payloads: Add compression
  • Sensitive data: Add encryption

Build docs developers (and LLMs) love