Skip to content

Recipe 48: Stream Pipeline

What You Build

A stream pipeline — Source produces events, Transform processes them, Sink persists them — with grafos-stream’s FabricQueue-backed inter-stage buffers. All three stages run on different cells; the queues live in leased memory; lease expiry on a queue is what backs flow control and crash recovery.

Source

cookbook/recipe-48-stream-pipeline/ in the source tree.

The recipe is the host-testable Transform stage: receive a JSON batch of { key, value } events, return a JSON batch with each value doubled, preserving order.

Core grafOS API Path

The inter-stage buffers are FabricQueue<T> values over leased memory. The explicit construction path is: acquire memory, construct the queue, push from the source side, pop on the transform side, then push to the outbound queue.

use grafos_collections::queue::FabricQueue;
use grafos_std::mem::MemBuilder;
let capacity = 8usize;
let stride = 128usize;
let bytes = 32 + capacity as u64 * stride as u64;
let inbound_lease = MemBuilder::new().min_bytes(bytes).lease_secs(300).acquire()?;
let outbound_lease = MemBuilder::new().min_bytes(bytes).lease_secs(300).acquire()?;
let mut inbound = FabricQueue::<Event>::new(inbound_lease, capacity, stride)?;
let mut outbound = FabricQueue::<Event>::new(outbound_lease, capacity, stride)?;
inbound.push(&Event { key: "a".into(), value: 3 })?;
if let Some(mut event) = inbound.pop()? {
event.value = event.value.saturating_mul(2);
if !outbound.push(&event)? {
return Err(grafos_std::FabricError::CapacityExceeded);
}
}
# Ok::<(), grafos_std::FabricError>(())

The recipe helper applies that operation repeatedly up to a batch limit:

pub fn compute(input: &[u8]) -> Result<TransformOutput, &'static str> {
let parsed: TransformInput = serde_json::from_slice(input).map_err(|_| "invalid_input")?;
let events_seen = parsed.events.len();
let events: Vec<Event> = parsed.events.into_iter().map(|e| Event {
key: e.key,
value: e.value.saturating_mul(2),
}).collect();
Ok(TransformOutput { events, events_seen })
}
pub fn transform_queue(
inbound: &mut FabricQueue<Event>,
outbound: &mut FabricQueue<Event>,
max_batch: usize,
) -> FabricResult<usize> {
let mut processed = 0;
while processed < max_batch {
let Some(mut event) = inbound.pop()? else { break; };
event.value = event.value.saturating_mul(2);
if !outbound.push(&event)? {
return Err(FabricError::CapacityExceeded);
}
processed += 1;
}
Ok(processed)
}

What’s interesting

  1. Order preservation. A real Transform stage that breaks order causes downstream stages to see events out of sequence and produce wrong output. The test pins ordering.
  2. Saturating arithmetic. Events at i64::MAX don’t panic; they cap. Programs see deterministic overflow rather than runtime traps.
  3. Empty batches OK. A Transform that handles zero events as the identity transformation is well-behaved when the upstream Source has nothing to push yet.
  4. Real fabric path. transform_queue moves events from one FabricQueue<Event> to another through the public queue API.

Failure Behavior

  • Invalid JSON returns invalid_input from the tasklet core.
  • Empty batches are accepted and return events_seen = 0.
  • Full outbound queues return CapacityExceeded; the caller should retry after downstream capacity drains or admission adds capacity.
  • Saturating arithmetic makes overflow deterministic instead of trapping the tasklet.

Run And Verify

Terminal window
cargo test -p cookbook-recipe-48-stream-pipeline

Expected: the tests cover ordered transformation, empty batches, saturating arithmetic, and the real FabricQueue helper path.

Adapt It

Change max_batch, queue capacity, and queue stride for your throughput and payload size. Keep transformations idempotent when upstream stages may retry after a failed push.