Documentation Index
Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt
Use this file to discover all available pages before exploring further.
Platformatic Job Queue uses serialization to convert payloads and results to/from binary format for storage. By default, JSON serialization is used, but you can implement custom serializers for better performance or to support non-JSON-serializable types.
Serde Interface
The Serde<T> interface defines the contract for serializers.
interface Serde<T> {
serialize(value: T): Buffer
deserialize(buffer: Buffer): T
}
serialize
(value: T) => Buffer
required
Converts a value to a Buffer for storage
deserialize
(buffer: Buffer) => T
required
Converts a Buffer back to the original value
Built-in Serializers
JsonSerde
Default JSON-based serializer.
import { JsonSerde, createJsonSerde } from '@platformatic/job-queue'
// Using the class directly
const serde = new JsonSerde<MyPayload>();
// Using the factory function
const serde = createJsonSerde<MyPayload>();
// Explicit configuration (JSON is the default)
const queue = new Queue({
storage,
payloadSerde: createJsonSerde(),
resultSerde: createJsonSerde()
});
Limitations:
- Cannot serialize functions, symbols, or circular references
- No support for special types like Date, RegExp, Map, Set (they’ll be converted to plain objects)
- Less efficient for binary data
Custom Serializers
MessagePack Example
MessagePack is a binary serialization format that’s more compact than JSON.
import { Serde } from '@platformatic/job-queue'
import * as msgpack from '@msgpack/msgpack'
class MessagePackSerde<T> implements Serde<T> {
serialize(value: T): Buffer {
const encoded = msgpack.encode(value);
return Buffer.from(encoded);
}
deserialize(buffer: Buffer): T {
return msgpack.decode(buffer) as T;
}
}
// Use with queue
const queue = new Queue({
storage,
payloadSerde: new MessagePackSerde(),
resultSerde: new MessagePackSerde()
});
Benefits:
- Smaller payload size (20-50% smaller than JSON)
- Faster serialization/deserialization
- Supports binary data
Install:
npm install @msgpack/msgpack
Protocol Buffers Example
Protocol Buffers provide strongly-typed, efficient serialization.
import { Serde } from '@platformatic/job-queue'
import { Message } from './generated/proto'
class ProtobufSerde<T extends Message> implements Serde<T> {
constructor(private MessageClass: new () => T) {}
serialize(value: T): Buffer {
return Buffer.from(value.serializeBinary());
}
deserialize(buffer: Buffer): T {
const message = new this.MessageClass();
message.deserializeBinary(new Uint8Array(buffer));
return message;
}
}
// Define your proto schema (job.proto)
// message JobPayload {
// string user_id = 1;
// string action = 2;
// }
//
// message JobResult {
// bool success = 1;
// string message = 2;
// }
import { JobPayload, JobResult } from './generated/job_pb'
const queue = new Queue<JobPayload, JobResult>({
storage,
payloadSerde: new ProtobufSerde(JobPayload),
resultSerde: new ProtobufSerde(JobResult)
});
// Enqueue with typed payload
const payload = new JobPayload();
payload.setUserId('user-123');
payload.setAction('send-email');
await queue.enqueue(payload);
Benefits:
- Smallest payload size
- Strong typing
- Schema evolution support
- Language-agnostic
Implement your own binary format for specific use cases.
import { Serde } from '@platformatic/job-queue'
interface ImagePayload {
id: string
format: 'jpeg' | 'png' | 'webp'
data: Buffer
width: number
height: number
}
class ImageSerde implements Serde<ImagePayload> {
serialize(value: ImagePayload): Buffer {
// Custom binary format:
// [id length: 2 bytes][id: variable]
// [format: 1 byte][width: 4 bytes][height: 4 bytes]
// [data length: 4 bytes][data: variable]
const idBuffer = Buffer.from(value.id, 'utf8');
const formatByte = value.format === 'jpeg' ? 0 :
value.format === 'png' ? 1 : 2;
const header = Buffer.alloc(15);
header.writeUInt16BE(idBuffer.length, 0);
header.writeUInt8(formatByte, 2);
header.writeUInt32BE(value.width, 3);
header.writeUInt32BE(value.height, 7);
header.writeUInt32BE(value.data.length, 11);
return Buffer.concat([header, idBuffer, value.data]);
}
deserialize(buffer: Buffer): ImagePayload {
const idLength = buffer.readUInt16BE(0);
const formatByte = buffer.readUInt8(2);
const width = buffer.readUInt32BE(3);
const height = buffer.readUInt32BE(7);
const dataLength = buffer.readUInt32BE(11);
const id = buffer.toString('utf8', 15, 15 + idLength);
const format = formatByte === 0 ? 'jpeg' :
formatByte === 1 ? 'png' : 'webp';
const data = buffer.slice(15 + idLength, 15 + idLength + dataLength);
return { id, format, data, width, height };
}
}
const queue = new Queue<ImagePayload, void>({
storage,
payloadSerde: new ImageSerde()
});
queue.process(async (job) => {
const { id, format, data, width, height } = job.payload;
await processImage(data, format, width, height);
});
Compression Example
Wrap serializers with compression for large payloads.
import { Serde } from '@platformatic/job-queue'
import { gzipSync, gunzipSync } from 'zlib'
class CompressedSerde<T> implements Serde<T> {
constructor(private innerSerde: Serde<T>) {}
serialize(value: T): Buffer {
const serialized = this.innerSerde.serialize(value);
return gzipSync(serialized);
}
deserialize(buffer: Buffer): T {
const decompressed = gunzipSync(buffer);
return this.innerSerde.deserialize(decompressed);
}
}
// Wrap JSON serializer with compression
const queue = new Queue({
storage,
payloadSerde: new CompressedSerde(createJsonSerde()),
resultSerde: new CompressedSerde(createJsonSerde())
});
Encryption Example
Encrypt sensitive data in payloads and results.
import { Serde } from '@platformatic/job-queue'
import { createCipheriv, createDecipheriv, randomBytes } from 'crypto'
class EncryptedSerde<T> implements Serde<T> {
constructor(
private innerSerde: Serde<T>,
private key: Buffer // 32 bytes for AES-256
) {}
serialize(value: T): Buffer {
const serialized = this.innerSerde.serialize(value);
const iv = randomBytes(16);
const cipher = createCipheriv('aes-256-cbc', this.key, iv);
const encrypted = Buffer.concat([
cipher.update(serialized),
cipher.final()
]);
// Prepend IV for decryption
return Buffer.concat([iv, encrypted]);
}
deserialize(buffer: Buffer): T {
const iv = buffer.slice(0, 16);
const encrypted = buffer.slice(16);
const decipher = createDecipheriv('aes-256-cbc', this.key, iv);
const decrypted = Buffer.concat([
decipher.update(encrypted),
decipher.final()
]);
return this.innerSerde.deserialize(decrypted);
}
}
// Use with sensitive data
const encryptionKey = Buffer.from(process.env.ENCRYPTION_KEY!, 'hex');
const queue = new Queue({
storage,
payloadSerde: new EncryptedSerde(
createJsonSerde(),
encryptionKey
)
});
// Payloads are now encrypted in storage
await queue.enqueue({
creditCard: '4111-1111-1111-1111',
ssn: '123-45-6789'
});
Separate Payload and Result Serializers
Use different serializers for payloads and results.
import { MessagePackSerde } from './msgpack-serde'
import { createJsonSerde } from '@platformatic/job-queue'
interface BinaryPayload {
imageData: Buffer
metadata: Record<string, any>
}
interface SimpleResult {
success: boolean
url: string
}
const queue = new Queue<BinaryPayload, SimpleResult>({
storage,
// Use MessagePack for binary payload
payloadSerde: new MessagePackSerde(),
// Use JSON for simple result
resultSerde: createJsonSerde()
});
queue.process(async (job) => {
const { imageData, metadata } = job.payload;
const url = await uploadImage(imageData, metadata);
return { success: true, url };
});
const result = await queue.enqueueAndWait(
{
imageData: Buffer.from('...'),
metadata: { userId: '123' }
},
{ timeout: 30000 }
);
console.log('Uploaded to:', result.url);
Benchmarking Serializers
import { performance } from 'perf_hooks'
function benchmarkSerde<T>(serde: Serde<T>, value: T, iterations = 10000) {
// Serialize benchmark
const serializeStart = performance.now();
let totalSize = 0;
for (let i = 0; i < iterations; i++) {
const buffer = serde.serialize(value);
totalSize += buffer.length;
}
const serializeTime = performance.now() - serializeStart;
// Deserialize benchmark
const buffer = serde.serialize(value);
const deserializeStart = performance.now();
for (let i = 0; i < iterations; i++) {
serde.deserialize(buffer);
}
const deserializeTime = performance.now() - deserializeStart;
return {
serializeMs: serializeTime,
deserializeMs: deserializeTime,
avgSize: totalSize / iterations,
serializeOpsPerSec: (iterations / serializeTime) * 1000,
deserializeOpsPerSec: (iterations / deserializeTime) * 1000
};
}
const payload = {
id: 'job-123',
data: { /* complex object */ },
timestamp: Date.now()
};
console.log('JSON:', benchmarkSerde(createJsonSerde(), payload));
console.log('MessagePack:', benchmarkSerde(new MessagePackSerde(), payload));
Choosing a Serializer
| Serializer | Size | Speed | Use Case |
|---|
| JSON | Large | Medium | Simple objects, debugging |
| MessagePack | Small | Fast | General purpose, binary data |
| Protocol Buffers | Smallest | Fastest | High throughput, typed schemas |
| Custom Binary | Smallest | Fastest | Specialized data formats |
| Compressed JSON | Very Small | Slow | Large text payloads |
| Encrypted | Same as inner | Slowest | Sensitive data |
Recommendations:
- Development: Use JSON for easy debugging
- Production: Use MessagePack for better performance
- High scale: Use Protocol Buffers or custom binary
- Large payloads: Add compression
- Sensitive data: Add encryption