stream-pipeline
A stream pipeline — Source produces events, Transform processes them, Sink persists them — with grafos-stream’s FabricQueue-backed inter-stage buffers. All three stages run on different cells; the queues live in leased memory; lease expiry on a queue is what backs flow control and crash recovery.
Source
cookbook/stream-pipeline/ in the source tree.
The recipe is the host-testable Transform stage: receive a JSON batch of { key, value } events, return a JSON batch with each value doubled, preserving order.
pub fn compute(input: &[u8]) -> Result<TransformOutput, &'static str> { let parsed: TransformInput = serde_json::from_slice(input).map_err(|_| "invalid_input")?; let events_seen = parsed.events.len(); let events: Vec<Event> = parsed.events.into_iter().map(|e| Event { key: e.key, value: e.value.saturating_mul(2), }).collect(); Ok(TransformOutput { events, events_seen })}What’s interesting
- Order preservation. A real Transform stage that breaks order causes downstream stages to see events out of sequence and produce wrong output. The test pins ordering.
- Saturating arithmetic. Events at
i64::MAXdon’t panic; they cap. Programs see deterministic overflow rather than runtime traps. - Empty batches OK. A Transform that handles zero events as the identity transformation is well-behaved when the upstream Source has nothing to push yet.
- Production drop-in. Wire this into
grafos-streamby registering thecompute()function as aTransformstage and pointing inbound + outbound queues atFabricQueueinstances on leased memory. The recipe’seventsslice becomesinbound.recv_batch(n); the outputeventsbecomesoutbound.send_batch(events).