Control Flow
Common composition patterns using the sequencer DSL.
Basic Pipeline
Chain blocks sequentially:
const pipeline = sequencer({
name: "process",
inputSchema: z.object({ message: z.string() }),
})
.then(validateBlock)
.then(chatGen)
.then(saveBlock);
Input Transformation
Use a connector function to transform input between blocks:
pipeline.then(
(output, ctx) => ({ query: output.text }), // connector
searchBlock // block
);
Conditional Steps
Execute a block only when a condition is true:
pipeline.thenIf(
(input, ctx) => ctx.session.state.needsReview,
reviewBlock
);
Parallel Execution
Run multiple blocks concurrently:
pipeline.parallel({
analysis: analysisBlock,
summary: summaryBlock,
tags: { connector: (input) => input.text, block: tagBlock },
}, { maxConcurrency: 3 });
// Output: { analysis: ..., summary: ..., tags: ... }
Iteration
Process array items:
pipeline
.map((input) => input.items) // Extract array
.forEach(processItemBlock, { maxConcurrency: 5 });
// Output: ProcessedItem[]
With a connector to extract the array:
pipeline.forEach(
(input) => input.items,
processItemBlock,
{ maxConcurrency: 5 }
);
Loops
Loop Until Condition
pipeline.doUntil(
(value, ctx) => value.confidence > 0.9,
refineBlock
);
Loop While Condition
pipeline.doWhile(
(value, ctx) => value.remaining > 0,
processNextBatch
);
Loop Back to Named Step
pipeline
.then(generateBlock) // step name from block.name
.then(validateBlock)
.loopBack("generate-block", {
when: (value, ctx) => !value.isValid,
maxIterations: 3,
});
Background Work
Queue non-blocking side effects:
pipeline
.then(mainProcessing)
.work(analyticsBlock) // Runs in background
.work(notificationBlock) // Runs in background
.then(nextStep); // Continues immediately
Work failures do NOT abort the main chain. They emit step_error items.
Wait for Work
pipeline
.work(taskA)
.work(taskB)
.waitForWork({ failOnError: false }); // Wait, don't fail on work errors
Side Effects
Execute a block without changing the main payload:
pipeline
.tap(logBlock) // Log but don't change output
.then(nextStep); // Receives original output
Conditional side effects:
pipeline.tapIf(
(value, ctx) => value.score < 0.5,
alertLowScoreBlock
);
Error Recovery
Catch errors and route to recovery blocks:
pipeline
.then(riskyBlock)
.rescue([
{ when: [NetworkError], block: retryWithBackupBlock },
{ when: [ModelError], block: fallbackModelBlock },
{ block: genericRecoveryBlock }, // catch-all
]);
Rescue handlers match by error type (checked in order). Success converts back to the normal chain.
Branching
Execute the first branch whose condition matches:
pipeline.branch({
urgent: [
(input) => input,
(input) => input.priority === "high",
urgentBlock,
],
normal: [
(input) => input,
(input) => input.priority !== "high",
normalBlock,
],
});
Each branch is a tuple: [connector, condition, block].