Skip to content

Recipe 19: Self-Healing Pipeline That Moves Mid-Flight

Situation

You want a pipeline to keep running through failures without full restart.

Recipe 8 provides the minimal “rewiring” primitive. This recipe builds a larger story:

  • relocates queue/buffer state
  • keeps durable offsets
  • coordinates downstream drains using generation markers

The goal here is “unexpectedly cool” operational behavior: you can watch the pipeline bend around a broken node and keep moving, without an external orchestrator doing topology surgery.

Cross-Domain Boundary

This recipe is about local pipeline mobility. QueueLocator identifies a local queue segment, and RelocatableQueueEdge records handoff/rebind across lease generations. That is different from a replicated queue or topic, where the logical stream has a stable resource name, placement policy, replica-set locator, quorum behavior, replicated consumer cursors, and idempotency.

Keep this recipe when you need a queue edge to move inside a local fabric or cell. For cross-domain stream continuation, build a companion design over grafos_replicated::ReplicatedFabricQueue or grafos_replicated::ReplicatedTopic; local queue handoffs can be recorded as recovery evidence, but they do not advance the logical resource generation by themselves.

What You Build

A pipeline that:

  • checkpoints offsets and commit markers via CheckpointedStageState
  • relocates stage endpoints on failure via RelocatableQueueEdge
  • uses FenceEpoch generation markers to prevent stale reads/writes
  • coordinates downstream rebinding via HandoffWriter / HandoffReader

Building Blocks

  • grafos_fence::FenceEpochsource
  • grafos_pipeline::{RelocatableQueueEdge, CheckpointedStageState}source
  • grafos_locator::{QueueLocator, HandoffWriter, HandoffReader}source
  • FabricQueue for inter-stage buffering
  • grafos_observe tracing

Related API docs:

Design

Generation Markers via FenceEpoch

FenceEpoch is the universal fence for topology changes. Each queue edge carries a generation:

use grafos_fence::FenceEpoch;
let gen = FenceEpoch::new(1);
// After relocation, bump the generation
let new_gen = gen.bump();
// Consumers reject stale generations
if msg_gen.is_stale(&current_gen) {
// Discard stale message
}

This is the same idea as the fenced leader epoch in Recipe 4, applied to pipeline edges.

RelocatableQueueEdge

RelocatableQueueEdge wraps a FabricQueue with generation tracking and automatic relocation:

  • push(item) / pop() — normal queue operations
  • relocate() — allocates a new queue, publishes the new locator, bumps generation
  • generation() — returns the current FenceEpoch

When a queue’s lease expires or the connection drops, the producer calls relocate() and resumes pushing to the new queue.

Coordinating Drains via Handoff Records

When a stage relocates, downstream consumers need to discover the new queue. The handoff pattern uses block storage as a rendezvous point:

use grafos_locator::locator::QueueLocator;
use grafos_locator::handoff::{HandoffWriter, HandoffReader};
// Producer side: publish new queue location
let locator = QueueLocator::new(new_lease_id, 0, capacity, new_gen.value());
writer.publish(&locator)?;
// Consumer side: poll for generation changes
if let Some(state) = reader.poll()? {
// state contains the new QueueLocator
// Rebind to the new queue
}

Durable Offsets and Commit Markers

Each stage checkpoints its progress via CheckpointedStageState:

  • input offset / last consumed id
  • output commit marker (what downstream has acknowledged)

If a stage relocates, it resumes from the last committed offset.

Define Your Correctness Boundary

“Exactly once” is not a single knob; you choose where you pay:

  • at-least-once between stages (simpler)
  • exactly-once at the final sink (common and practical)

This recipe assumes:

  • stages may replay a small window after relocation
  • the sink is idempotent (or deduplicates using a stable event id)

Walkthrough

1. Steady-State Run

Producer pushes items through RelocatableQueueEdge. Consumer pops and processes. Both check generation markers on every operation.

2. Inject Node Failure

The queue’s lease expires. push() returns an error.

3. Stage Relocates Output

// Producer detects failure and relocates
edge.relocate()?;
// New queue allocated, new locator published via HandoffWriter
// Generation bumped: consumers will see the change

4. Downstream Rebinding

// Consumer polls HandoffReader
if let Some(state) = handoff_reader.poll()? {
// New generation detected — rebind to new queue
consumer.rebind(state)?;
}

5. Resume Throughput

Consumer resumes from its last checkpointed offset. Any messages in the replay window are handled by sink idempotency.

Failure Modes

  • Stage crashes: output queue lease may expire; downstream sees Disconnected/LeaseExpired and rebinds.
  • Split brain: two producers write to two queues; FenceEpoch fencing prevents consumers from accepting stale stream.
  • Checkpoint lag: replay window increases; tune checkpoint frequency.

Observability

Track:

  • relocation count per stage
  • time-to-recover after Disconnected
  • replay window size (events replayed after relocation)
  • end-to-end latency distribution before/after failover
  • current generation per edge (edge.generation().value())

Variations

  • multi-writer stages: shard by keyspace, one pipeline per shard
  • Replicated queues/topics: use a logical replicated resource for cross-domain stream availability, with local queue handoff as an implementation detail underneath it