import { scope } from 'go-go-scope';
async function parallelPipeline() {
await using s = scope();
// Input data
const input = s.channel<number>(1000);
const output = s.channel<number>(1000);
// Producer
s.task(async () => {
for (let i = 0; i < 10000; i++) {
await input.send(i);
}
input.close();
});
// Parallel processing stage (4 workers)
const workers = 4;
for (let i = 0; i < workers; i++) {
s.task(async ({ signal }) => {
for await (const value of input) {
if (signal.aborted) break;
// CPU-intensive operation
const result = await processData(value);
await output.send(result);
}
});
}
// Monitor when all workers finish
s.task(async ({ signal }) => {
// Wait for input channel to close
while (!input.isClosed && !signal.aborted) {
await new Promise(r => setTimeout(r, 100));
}
// Wait for all items to be processed
while (input.size > 0 && !signal.aborted) {
await new Promise(r => setTimeout(r, 10));
}
output.close();
});
// Consumer
const results: number[] = [];
for await (const result of output) {
results.push(result);
}
return results;
}