Recipe 19: Self-Healing Pipeline That Moves Mid-Flight
Situation
You want a pipeline to keep running through failures without full restart.
Recipe 8 provides the minimal “rewiring” primitive. This recipe builds a larger story:
- relocates queue/buffer state
- keeps durable offsets
- coordinates downstream drains using generation markers
The goal here is “unexpectedly cool” operational behavior: you can watch the pipeline bend around a broken node and keep moving, without an external orchestrator doing topology surgery.
Cross-Domain Boundary
This recipe is about local pipeline mobility. QueueLocator identifies a
local queue segment, and RelocatableQueueEdge records handoff/rebind
across lease generations. That is different from a replicated
queue or topic, where the logical stream has a stable resource name,
placement policy, replica-set locator, quorum behavior, replicated
consumer cursors, and idempotency.
Keep this recipe when you need a queue edge to move inside a local fabric
or cell. For cross-domain stream continuation, build a companion design
over grafos_replicated::ReplicatedFabricQueue or
grafos_replicated::ReplicatedTopic; local queue handoffs can be recorded
as recovery evidence, but they do not advance the logical resource
generation by themselves.
What You Build
A pipeline that:
- checkpoints offsets and commit markers via
CheckpointedStageState - relocates stage endpoints on failure via
RelocatableQueueEdge - uses
FenceEpochgeneration markers to prevent stale reads/writes - coordinates downstream rebinding via
HandoffWriter/HandoffReader
Building Blocks
grafos_fence::FenceEpoch— sourcegrafos_pipeline::{RelocatableQueueEdge, CheckpointedStageState}— sourcegrafos_locator::{QueueLocator, HandoffWriter, HandoffReader}— sourceFabricQueuefor inter-stage bufferinggrafos_observetracing
Related API docs:
Design
Generation Markers via FenceEpoch
FenceEpoch is the universal fence for topology changes. Each queue edge carries a generation:
use grafos_fence::FenceEpoch;
let gen = FenceEpoch::new(1);
// After relocation, bump the generationlet new_gen = gen.bump();
// Consumers reject stale generationsif msg_gen.is_stale(¤t_gen) { // Discard stale message}This is the same idea as the fenced leader epoch in Recipe 4, applied to pipeline edges.
RelocatableQueueEdge
RelocatableQueueEdge wraps a FabricQueue with generation tracking and automatic relocation:
push(item)/pop()— normal queue operationsrelocate()— allocates a new queue, publishes the new locator, bumps generationgeneration()— returns the currentFenceEpoch
When a queue’s lease expires or the connection drops, the producer calls relocate() and resumes
pushing to the new queue.
Coordinating Drains via Handoff Records
When a stage relocates, downstream consumers need to discover the new queue. The handoff pattern uses block storage as a rendezvous point:
use grafos_locator::locator::QueueLocator;use grafos_locator::handoff::{HandoffWriter, HandoffReader};
// Producer side: publish new queue locationlet locator = QueueLocator::new(new_lease_id, 0, capacity, new_gen.value());writer.publish(&locator)?;
// Consumer side: poll for generation changesif let Some(state) = reader.poll()? { // state contains the new QueueLocator // Rebind to the new queue}Durable Offsets and Commit Markers
Each stage checkpoints its progress via CheckpointedStageState:
- input offset / last consumed id
- output commit marker (what downstream has acknowledged)
If a stage relocates, it resumes from the last committed offset.
Define Your Correctness Boundary
“Exactly once” is not a single knob; you choose where you pay:
- at-least-once between stages (simpler)
- exactly-once at the final sink (common and practical)
This recipe assumes:
- stages may replay a small window after relocation
- the sink is idempotent (or deduplicates using a stable event id)
Walkthrough
1. Steady-State Run
Producer pushes items through RelocatableQueueEdge. Consumer pops and processes. Both check
generation markers on every operation.
2. Inject Node Failure
The queue’s lease expires. push() returns an error.
3. Stage Relocates Output
// Producer detects failure and relocatesedge.relocate()?;// New queue allocated, new locator published via HandoffWriter// Generation bumped: consumers will see the change4. Downstream Rebinding
// Consumer polls HandoffReaderif let Some(state) = handoff_reader.poll()? { // New generation detected — rebind to new queue consumer.rebind(state)?;}5. Resume Throughput
Consumer resumes from its last checkpointed offset. Any messages in the replay window are handled by sink idempotency.
Failure Modes
- Stage crashes: output queue lease may expire; downstream sees
Disconnected/LeaseExpiredand rebinds. - Split brain: two producers write to two queues;
FenceEpochfencing prevents consumers from accepting stale stream. - Checkpoint lag: replay window increases; tune checkpoint frequency.
Observability
Track:
- relocation count per stage
- time-to-recover after
Disconnected - replay window size (events replayed after relocation)
- end-to-end latency distribution before/after failover
- current generation per edge (
edge.generation().value())
Variations
- multi-writer stages: shard by keyspace, one pipeline per shard
- Replicated queues/topics: use a logical replicated resource for cross-domain stream availability, with local queue handoff as an implementation detail underneath it