Skip to content

stream-pipeline

A stream pipeline — Source produces events, Transform processes them, Sink persists them — with grafos-stream’s FabricQueue-backed inter-stage buffers. All three stages run on different cells; the queues live in leased memory; lease expiry on a queue is what backs flow control and crash recovery.

Source

cookbook/stream-pipeline/ in the source tree.

The recipe is the host-testable Transform stage: receive a JSON batch of { key, value } events, return a JSON batch with each value doubled, preserving order.

pub fn compute(input: &[u8]) -> Result<TransformOutput, &'static str> {
let parsed: TransformInput = serde_json::from_slice(input).map_err(|_| "invalid_input")?;
let events_seen = parsed.events.len();
let events: Vec<Event> = parsed.events.into_iter().map(|e| Event {
key: e.key,
value: e.value.saturating_mul(2),
}).collect();
Ok(TransformOutput { events, events_seen })
}

What’s interesting

  1. Order preservation. A real Transform stage that breaks order causes downstream stages to see events out of sequence and produce wrong output. The test pins ordering.
  2. Saturating arithmetic. Events at i64::MAX don’t panic; they cap. Programs see deterministic overflow rather than runtime traps.
  3. Empty batches OK. A Transform that handles zero events as the identity transformation is well-behaved when the upstream Source has nothing to push yet.
  4. Production drop-in. Wire this into grafos-stream by registering the compute() function as a Transform stage and pointing inbound + outbound queues at FabricQueue instances on leased memory. The recipe’s events slice becomes inbound.recv_batch(n); the output events becomes outbound.send_batch(events).