Recipe 48: Stream Pipeline
What You Build
A stream pipeline — Source produces events, Transform processes them, Sink persists them — with grafos-stream’s FabricQueue-backed inter-stage buffers. All three stages run on different cells; the queues live in leased memory; lease expiry on a queue is what backs flow control and crash recovery.
Source
cookbook/recipe-48-stream-pipeline/ in the source tree.
The recipe is the host-testable Transform stage: receive a JSON batch of { key, value } events, return a JSON batch with each value doubled, preserving order.
Core grafOS API Path
The inter-stage buffers are FabricQueue<T> values over leased memory. The
explicit construction path is: acquire memory, construct the queue, push from
the source side, pop on the transform side, then push to the outbound queue.
use grafos_collections::queue::FabricQueue;use grafos_std::mem::MemBuilder;
let capacity = 8usize;let stride = 128usize;let bytes = 32 + capacity as u64 * stride as u64;let inbound_lease = MemBuilder::new().min_bytes(bytes).lease_secs(300).acquire()?;let outbound_lease = MemBuilder::new().min_bytes(bytes).lease_secs(300).acquire()?;
let mut inbound = FabricQueue::<Event>::new(inbound_lease, capacity, stride)?;let mut outbound = FabricQueue::<Event>::new(outbound_lease, capacity, stride)?;
inbound.push(&Event { key: "a".into(), value: 3 })?;if let Some(mut event) = inbound.pop()? { event.value = event.value.saturating_mul(2); if !outbound.push(&event)? { return Err(grafos_std::FabricError::CapacityExceeded); }}# Ok::<(), grafos_std::FabricError>(())The recipe helper applies that operation repeatedly up to a batch limit:
pub fn compute(input: &[u8]) -> Result<TransformOutput, &'static str> { let parsed: TransformInput = serde_json::from_slice(input).map_err(|_| "invalid_input")?; let events_seen = parsed.events.len(); let events: Vec<Event> = parsed.events.into_iter().map(|e| Event { key: e.key, value: e.value.saturating_mul(2), }).collect(); Ok(TransformOutput { events, events_seen })}
pub fn transform_queue( inbound: &mut FabricQueue<Event>, outbound: &mut FabricQueue<Event>, max_batch: usize,) -> FabricResult<usize> { let mut processed = 0; while processed < max_batch { let Some(mut event) = inbound.pop()? else { break; }; event.value = event.value.saturating_mul(2); if !outbound.push(&event)? { return Err(FabricError::CapacityExceeded); } processed += 1; } Ok(processed)}What’s interesting
- Order preservation. A real Transform stage that breaks order causes downstream stages to see events out of sequence and produce wrong output. The test pins ordering.
- Saturating arithmetic. Events at
i64::MAXdon’t panic; they cap. Programs see deterministic overflow rather than runtime traps. - Empty batches OK. A Transform that handles zero events as the identity transformation is well-behaved when the upstream Source has nothing to push yet.
- Real fabric path.
transform_queuemoves events from oneFabricQueue<Event>to another through the public queue API.
Failure Behavior
- Invalid JSON returns
invalid_inputfrom the tasklet core. - Empty batches are accepted and return
events_seen = 0. - Full outbound queues return
CapacityExceeded; the caller should retry after downstream capacity drains or admission adds capacity. - Saturating arithmetic makes overflow deterministic instead of trapping the tasklet.
Run And Verify
cargo test -p cookbook-recipe-48-stream-pipelineExpected: the tests cover ordered transformation, empty batches, saturating arithmetic, and the real FabricQueue helper path.
Adapt It
Change max_batch, queue capacity, and queue stride for your throughput and payload size. Keep transformations idempotent when upstream stages may retry after a failed push.