Composing Blocks
This page covers the six sequencer methods you'll reach for on day one. They're enough to build most pipelines. The rest of the DSL (parallelism, looping, racing, branching) lives in the Control Flow Reference, and you can ignore it until you actually need it.
The six methods are:
| Method | What it does |
|---|---|
then(block) | Run a block. Its output becomes the next step's input. |
map(fn) | Inline transform. No block. Reshape the value between steps. |
tap(block) | Run a block for its side effect. The pipeline value passes through unchanged. |
thenIf(cond, block) | Run a block only when a condition is true. |
work(block) | Fire a block in the background. The chain continues immediately. |
rescue(handlers) | Catch errors and route to recovery blocks. |
The example in this page builds up an order-processing pipeline one method at a time. Each addition introduces one new method.
Setup: a few blocks
We'll compose three blocks: a validator, an LLM-based pricing generator, and a recorder that writes the result to session state. The internals are simplified — focus on how they chain together.
import { handler, generator, sequencer } from "@flow-state-dev/core";
import { z } from "zod";
const orderInput = z.object({
items: z.array(z.object({ sku: z.string(), qty: z.number() })),
customerId: z.string(),
});
const pricedOrder = z.object({
total: z.number(),
discounted: z.boolean(),
});
// Validator: throws on bad input. No output — pipeline value passes through.
const validateOrder = handler({
name: "validate-order",
inputSchema: orderInput,
execute: async (input) => {
if (input.items.length === 0) throw new Error("Empty order");
},
});
// Generator: applies pricing rules to the order.
const priceOrder = generator({
name: "price-order",
model: "preset/fast",
prompt: "You apply customer-specific pricing rules.",
inputSchema: orderInput,
outputSchema: pricedOrder,
user: (input) => `Price this order: ${JSON.stringify(input)}`,
});
// Recorder: writes the priced result to session state and returns an order id.
const recordOrder = handler({
name: "record-order",
inputSchema: pricedOrder,
outputSchema: z.object({ orderId: z.string() }),
sessionStateSchema: z.object({
lastOrderId: z.string().optional(),
lastOrderTotal: z.number().optional(),
}),
execute: async (input, ctx) => {
const orderId = crypto.randomUUID();
await ctx.session.patchState({
lastOrderId: orderId,
lastOrderTotal: input.total,
});
return { orderId };
},
});
validateOrder has no outputSchema and returns nothing. That's the correct shape for a pure check: throw if the input is bad, otherwise stay out of the way. We'll attach it with .tap() below.
recordOrder declares a sessionStateSchema so TypeScript types ctx.session.state to those fields. See State and scopes for the full state API.
then — sequential steps
Chain blocks with .then(). Each block's output is the next block's input. TypeScript checks the types between steps.
const orderPipeline = sequencer({
name: "order-pipeline",
inputSchema: orderInput,
})
.then(priceOrder)
.then(recordOrder);
priceOrder produces { total, discounted }. recordOrder consumes that shape and produces { orderId }. The sequencer's output type is whatever the last step returns.
If the next block expects a different shape, pass a connector function as the first argument:
.then((output) => ({ amount: output.total }), chargeBlock)
The connector receives the previous output and returns the input the next block expects. See Connectors.
tap — side effects without changing the payload
Some blocks exist to do something — log, validate, mutate state — without transforming the pipeline value. Use .tap(). The block runs, the value passes through unchanged.
sequencer({ name: "order-pipeline", inputSchema: orderInput })
.tap(validateOrder) // throws if invalid; otherwise no-op
.then(priceOrder) // still receives the original orderInput
.then(recordOrder);
priceOrder gets the original input, not anything from validateOrder. validateOrder has no output to merge in — it either throws or it doesn't.
This is the right shape for a validator. Don't write a handler that returns its input back out just to satisfy .then() — that pollutes the items log with a redundant echo. If a block has no meaningful output, drop the outputSchema and use .tap().
The same applies to logging, telemetry, and state mutation. If the only purpose is the side effect, it's a tap.
map — inline transform
Sometimes you need to reshape a value between steps without running a block. Use .map(). It's a pure function, not a block, so it doesn't show up as its own step in the items log.
.then(priceOrder)
.map((priced) => ({ ...priced, totalCents: Math.round(priced.total * 100) }))
.then(recordOrder)
map is for cheap synchronous reshaping. If the transform has side effects, hits the network, or you want it to appear as a step in the trace, use a handler block with .then() instead.
thenIf — run a step only when a condition holds
What if you only want to apply a discount on larger orders that aren't already discounted? Wrap the discount step in .thenIf():
const applyVolumeDiscount = handler({
name: "apply-volume-discount",
inputSchema: pricedOrder,
outputSchema: pricedOrder,
execute: async (input) => ({
total: input.total * 0.9,
discounted: true,
}),
});
sequencer({ name: "order-pipeline", inputSchema: orderInput })
.tap(validateOrder)
.then(priceOrder)
.thenIf(
(value) => value.total > 100 && !value.discounted,
applyVolumeDiscount
)
.then(recordOrder);
The condition gets the current pipeline value and the block context. It can be sync or async. When it returns false the step is skipped and the pipeline value passes through unchanged — the next step still receives priceOrder's output.
You can also reach into the context to read state from any scope the surrounding block has declared:
.thenIf(
(value, ctx) => ctx.session.state.features.discountsEnabled,
applyVolumeDiscount
)
thenIf also accepts a static boolean if the condition is known at build time:
.thenIf(ENABLE_DISCOUNTS, applyVolumeDiscount)
For conditional side effects (a tap that runs only when something is true) use tapIf. For conditional background work, workIf. Both are in the Control Flow Reference.
work — fire-and-forget background tasks
Some work shouldn't block the user. Analytics is the classic example: you want it to run, but the customer doesn't have to wait for it. .work() queues a block in the background and lets the chain continue immediately.
const trackOrder = handler({
name: "track-order",
inputSchema: z.object({ orderId: z.string() }),
execute: async (input, ctx) => {
await fetch("https://analytics.example.com/orders", {
method: "POST",
body: JSON.stringify({ orderId: input.orderId }),
signal: ctx.signal,
});
},
});
orderPipeline
.tap(validateOrder)
.then(priceOrder)
.then(recordOrder)
.work(trackOrder); // dispatched, not awaited
recordOrder returns immediately. trackOrder runs in parallel. The sequencer waits for outstanding .work() tasks to settle before it returns, so they don't get orphaned, but they don't slow the main chain.
If a .work() block throws, the main chain is not aborted. The error is recorded as a step_error item on the work side. This is the right shape for non-essential work where failure is recoverable.
For more on background work — including waiting on results, fan-out over arrays, and conditional dispatch — see Side Chains.
rescue — catch errors, route to recovery
By default a thrown error halts the sequencer. .rescue() lets you intercept errors and route them to recovery blocks based on error type.
class PaymentDeclinedError extends Error {}
class NetworkError extends Error {}
const retryWithDifferentProvider = handler({
name: "retry-different-provider",
inputSchema: z.unknown(),
outputSchema: z.object({ orderId: z.string() }),
execute: async () => {
// ... try a backup payment provider, return { orderId } on success
return { orderId: crypto.randomUUID() };
},
});
const recordFailedOrder = handler({
name: "record-failed-order",
inputSchema: z.unknown(),
outputSchema: z.object({ orderId: z.string() }),
execute: async () => {
return { orderId: "failed" };
},
});
orderPipeline
.tap(validateOrder)
.then(priceOrder)
.then(recordOrder)
.rescue([
{ when: [NetworkError], block: retryWithDifferentProvider },
{ when: [PaymentDeclinedError], block: recordFailedOrder },
]);
Handlers are checked in order. The first match runs. If the recovery block succeeds, its output continues down the chain. If no handler matches, the original error propagates up. Add a final entry without when to catch anything else.
rescue is the only error-handling primitive in the DSL. There's no try/catch wrapping at the chain level. If you don't add .rescue(), errors bubble out and the sequencer fails — which is usually what you want.
Putting it together
Here's the pipeline using all six methods:
const orderPipeline = sequencer({
name: "order-pipeline",
inputSchema: orderInput,
})
.tap(validateOrder)
.then(priceOrder)
.map((priced) => ({ ...priced, totalCents: Math.round(priced.total * 100) }))
.thenIf(
(value) => value.total > 100 && !value.discounted,
applyVolumeDiscount
)
.then(recordOrder)
.work(trackOrder)
.rescue([
{ when: [NetworkError], block: retryWithDifferentProvider },
{ block: recordFailedOrder },
]);
Read top to bottom: validate (throws on bad input), price, normalize the shape, maybe discount, record, fire analytics in the background, recover from known errors. That covers a real pipeline with a small set of methods.
When you outgrow this page
The six methods above handle the common cases. The DSL has more once you need it:
- Run several blocks at once →
parallel,thenAll,forEach - Loop until a condition is met →
doUntil,doWhile,loopBack - Conditional taps and work →
tapIf,workIf,exitIf - Fallback chains and races →
thenAny,race,branch - Wait on background work before continuing →
waitForWork - Per-block input adaptation →
connectInput
All of these live in the Control Flow Reference, grouped by use case.