Recipe 8: A Stream Pipeline That Rewires Around Failures
Situation
You have a multi-stage pipeline (ingest -> parse -> enrich -> store). One stage failure traditionally requires an orchestrator restart or a manual reconfiguration.
In a lease-based world, you want failure recovery to look like normal error handling:
- stage detects output transport failure
- stage reacquires resources elsewhere
- downstream reconnects to the new endpoint
The trick is: you need a stable rendezvous point so downstream can find the “current” output queue.
What You Build
A pipeline where:
- boundaries are
FabricQueueinstances backed by leased memory - progress is checkpointed with
Durable - stage endpoints are published through a durable “handoff record” with a generation counter
This complements Recipe 19 (which is a larger “moves mid-flight” story) by focusing on the minimal rewiring primitive.
Cross-Domain Boundary
This recipe is intentionally lease-local. FabricQueue, Durable, and
RelocatableQueueEdge let a stage recover from a failed local queue
segment, but they do not create a cross-zone, cross-region, or
cross-provider replicated stream by themselves.
Use this recipe when the problem is local handoff/rebind. Use replicated
queue, topic, or log resources when the problem is logical
stream availability across failure domains. In that design, local
FabricQueue segments may still exist underneath the replicated resource,
but the user-facing contract comes from PlacementPolicy,
ReplicaPolicy, replicated cursors, idempotency, and resource
observations.
Building Blocks
grafos_collections::queue::FabricQueuegrafos_collections::durable::Durablegrafos_std::mem::MemBuildergrafos_std::block::BlockBuildergrafos_stream::Pipelinefor declarative pipeline construction — sourcegrafos_pipeline::RelocatableQueueEdgefor queue edges that relocate on failure — sourcegrafos_fence::FenceEpochfor generation tracking — source
Related API docs:
- grafos-collections guide
- FabricQueue implementation (source)
- Durable implementation (source)
- grafos-std memory API (source)
Design
Pipeline Construction
Use grafos_stream::Pipeline for declarative pipeline assembly:
use grafos_stream::Pipeline;
Pipeline::from_source(ingest_source) .map(|raw| parse(raw)) .filter(|record| record.is_valid()) .sink(store_sink) .run()?;Under the hood, inter-stage boundaries are FabricQueue instances backed by leased memory.
Relocatable Queue Edges
Instead of ad-hoc handoff records, use RelocatableQueueEdge from grafos_pipeline:
- wraps a
FabricQueuewith a locator and aFenceEpochgeneration counter - on failure, call
.relocate()to acquire a new lease, create a fresh queue, bump the generation, and publish a new locator - downstream detects the generation change via
FenceEpoch::is_stale()and rebinds
use grafos_pipeline::RelocatableQueueEdge;use grafos_fence::FenceEpoch;
let edge = RelocatableQueueEdge::new(queue, locator, handoff_reader);
// Normal path:edge.push(item)?;
// On FabricError::Disconnected:edge.relocate()?; // acquires new lease, bumps generationlet gen = edge.generation(); // FenceEpoch — downstream checks stalenessOffsets and Idempotency
To avoid reprocessing:
- each stage checkpoints input offset / last committed id
- sinks should be idempotent or detect duplicates
This recipe is about the rewiring mechanism; end-to-end exactly-once is an application-layer topic.
Walkthrough (Implementation Sketch)
Core grafOS API Path
The edge is a FabricQueue plus a generationed locator handoff:
use grafos_collections::queue::FabricQueue;use grafos_fence::FenceEpoch;use grafos_locator::handoff::{HandoffReader, HandoffState, HandoffWriter};use grafos_locator::locator::QueueLocator;use grafos_pipeline::RelocatableQueueEdge;use grafos_std::block::BlockBuilder;use grafos_std::mem::MemBuilder;
let queue_lease = MemBuilder::new().min_bytes(8192).lease_secs(300).acquire()?;let queue_lease_id = queue_lease.lease_id();let queue: FabricQueue<Vec<u8>> = FabricQueue::new(queue_lease, 16, 256)?;let locator = QueueLocator::new(queue_lease_id, 0, 1);
let handoff_lease = BlockBuilder::new().min_blocks(64).lease_secs(600).acquire()?;let initial = HandoffState { stage_id: 1, generation: FenceEpoch::zero(), locator: locator.clone(),};let mut writer = HandoffWriter::new(initial, handoff_lease);writer.publish(locator.clone())?;
let reader_lease = writer.into_block_lease();let reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
edge.push(&b"event-1".to_vec())?;let item = edge.pop()?;# let _ = item;# Ok::<(), grafos_pipeline::EdgeError>(())When push or pop sees a recoverable lease error, RelocatableQueueEdge
polls the handoff reader and rebases the queue handle to the newer locator
generation.
1. Build Stage Queues
Each stage has:
- input: a
RelocatableQueueEdgepointing at the upstream stage’s output - output: a
RelocatableQueueEdgewrapping aFabricQueuein a leased arena
On startup, it reads the upstream edge’s locator to find its input queue.
2. On Disconnected: Relocate the Edge
When push returns FabricError::Disconnected:
edge.relocate()?;// Internally: acquires new MemLease, creates new FabricQueue,// bumps FenceEpoch generation, publishes new locator.No manual lease management or generation bookkeeping — RelocatableQueueEdge handles it.
3. Downstream Rebinds on Generation Change
Downstream checks the edge’s generation (a FenceEpoch):
if edge.generation().is_stale(&my_cached_generation) { // Rebind: re-read locator and reconstruct input queue handle. my_cached_generation = edge.generation();}Failure Modes
Disconnected: triggers rebuild and rebinding.LeaseExpired: treat as “queue gone”; rebuild.- Partial writes: ensure queue implementation is robust to partial updates (or treat as corruption and rebuild).
Observability
Track:
- stage restarts / rebinds
- generation changes
- throughput dip during failover
Variations
- use
watch()for faster notification (needs shared memory visibility) - Replicated queue/topic/log for higher availability across explicit failure domains