Actor Model Pattern
Implement the Actor Model using go-go-scope channels for message-passing concurrency. Actors are isolated units that communicate via messages, eliminating shared mutable state.Basic Actor
Create a simple actor that processes messages sequentially:import { scope, Channel } from 'go-go-scope';
interface Message {
type: 'increment' | 'decrement' | 'get';
reply?: Channel<number>;
}
class CounterActor {
private count = 0;
private mailbox: Channel<Message>;
constructor(mailbox: Channel<Message>) {
this.mailbox = mailbox;
}
async start(signal: AbortSignal) {
for await (const msg of this.mailbox) {
if (signal.aborted) break;
switch (msg.type) {
case 'increment':
this.count++;
break;
case 'decrement':
this.count--;
break;
case 'get':
if (msg.reply) {
await msg.reply.send(this.count);
msg.reply.close();
}
break;
}
}
}
}
// Usage
await using s = scope();
const mailbox = s.channel<Message>(100);
const counter = new CounterActor(mailbox);
// Start actor
s.task(({ signal }) => counter.start(signal));
// Send messages
await mailbox.send({ type: 'increment' });
await mailbox.send({ type: 'increment' });
// Get current count
const replyCh = s.channel<number>(1);
await mailbox.send({ type: 'get', reply: replyCh });
const count = await replyCh.receive();
console.log('Count:', count); // 2
Typed Actor System
Define actor protocol
Create type-safe message definitions:
type Request<T> = T & { reply?: Channel<Response<T>> };
type Response<T> =
| { success: true; data: unknown }
| { success: false; error: string };
// User actor messages
type UserMessage =
| { type: 'create'; name: string; email: string }
| { type: 'update'; id: string; name?: string; email?: string }
| { type: 'delete'; id: string }
| { type: 'get'; id: string };
interface User {
id: string;
name: string;
email: string;
}
Implement stateful actor
Build an actor with internal state:
class UserActor {
private users = new Map<string, User>();
private mailbox: Channel<Request<UserMessage>>;
constructor(mailbox: Channel<Request<UserMessage>>) {
this.mailbox = mailbox;
}
async start(signal: AbortSignal) {
for await (const msg of this.mailbox) {
if (signal.aborted) break;
try {
const result = await this.handleMessage(msg);
if (msg.reply) {
await msg.reply.send({ success: true, data: result });
msg.reply.close();
}
} catch (error) {
if (msg.reply) {
await msg.reply.send({
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
});
msg.reply.close();
}
}
}
}
private async handleMessage(msg: Request<UserMessage>): Promise<unknown> {
switch (msg.type) {
case 'create': {
const user: User = {
id: crypto.randomUUID(),
name: msg.name,
email: msg.email,
};
this.users.set(user.id, user);
return user;
}
case 'update': {
const user = this.users.get(msg.id);
if (!user) throw new Error('User not found');
if (msg.name) user.name = msg.name;
if (msg.email) user.email = msg.email;
return user;
}
case 'delete': {
const deleted = this.users.delete(msg.id);
if (!deleted) throw new Error('User not found');
return { deleted: true };
}
case 'get': {
const user = this.users.get(msg.id);
if (!user) throw new Error('User not found');
return user;
}
}
}
}
Create actor client
Build a client for easy actor communication:
class UserActorClient {
constructor(private mailbox: Channel<Request<UserMessage>>) {}
async createUser(name: string, email: string): Promise<User> {
const reply = new Channel<Response<UserMessage>>(1);
await this.mailbox.send({
type: 'create',
name,
email,
reply,
});
const response = await reply.receive();
if (!response || !response.success) {
throw new Error(response?.error ?? 'Request failed');
}
return response.data as User;
}
async getUser(id: string): Promise<User> {
const reply = new Channel<Response<UserMessage>>(1);
await this.mailbox.send({ type: 'get', id, reply });
const response = await reply.receive();
if (!response || !response.success) {
throw new Error(response?.error ?? 'Request failed');
}
return response.data as User;
}
}
Actor Supervision
Implement supervision strategies for fault tolerance:import { scope } from 'go-go-scope';
class Supervisor {
private restarts = 0;
private maxRestarts = 3;
async supervise(actorFactory: () => Promise<void>) {
await using s = scope();
while (this.restarts < this.maxRestarts) {
const [err] = await s.task(actorFactory, {
retry: {
maxRetries: 0, // Handle restarts at supervisor level
},
});
if (!err) {
// Actor completed successfully
return;
}
this.restarts++;
console.error(`Actor failed, restart ${this.restarts}/${this.maxRestarts}`);
// Exponential backoff before restart
await new Promise(r => setTimeout(r, Math.pow(2, this.restarts) * 1000));
}
throw new Error('Actor failed permanently after max restarts');
}
}
// Usage
const supervisor = new Supervisor();
await supervisor.supervise(async () => {
await using s = scope();
const mailbox = s.channel<Message>(100);
const actor = new CounterActor(mailbox);
await actor.start(s.signal);
});
Actor Pool
Create a pool of worker actors for load balancing:import { scope, Channel } from 'go-go-scope';
interface WorkMessage<T> {
data: T;
reply: Channel<unknown>;
}
class ActorPool<T> {
private workers: Channel<WorkMessage<T>>[] = [];
private roundRobin = 0;
async start(poolSize: number, workerFn: (data: T) => Promise<unknown>) {
await using s = scope();
// Create worker actors
for (let i = 0; i < poolSize; i++) {
const mailbox = s.channel<WorkMessage<T>>(100);
this.workers.push(mailbox);
// Start worker
s.task(async ({ signal }) => {
for await (const msg of mailbox) {
if (signal.aborted) break;
try {
const result = await workerFn(msg.data);
await msg.reply.send({ success: true, data: result });
} catch (error) {
await msg.reply.send({
success: false,
error: error instanceof Error ? error.message : 'Unknown',
});
} finally {
msg.reply.close();
}
}
});
}
}
async send(data: T): Promise<unknown> {
// Round-robin load balancing
const worker = this.workers[this.roundRobin];
this.roundRobin = (this.roundRobin + 1) % this.workers.length;
if (!worker) throw new Error('No workers available');
const reply = new Channel<unknown>(1);
await worker.send({ data, reply });
const response = await reply.receive();
return response;
}
}
// Usage
const pool = new ActorPool<number>();
await pool.start(4, async (num) => {
// CPU-intensive work
return num * num;
});
const result = await pool.send(42);
console.log('Result:', result);
Best Practices
- Immutable messages: Never mutate messages after sending
- Single responsibility: Each actor handles one concern
- Bounded mailboxes: Set capacity limits to prevent memory issues
- Timeout handling: Use timeouts for request-reply patterns
- Supervision: Always supervise actors for fault tolerance
- Backpressure: Use channel backpressure to handle overload
Related Patterns
- Pipeline Processing - Chain actors for data transformation
- Rate Limiting - Control message flow
- Retry Strategies - Handle actor failures