Recipe 31: Event-Driven Pipeline Without Network Hops
Situation
You need a classic event-driven backend: objects land in storage, notifications fire, workers process them. In a conventional setup this means wiring together three separate services — S3 for storage, SQS for queuing, Lambda for compute — with network round-trips, API gateways, and IAM policies between each hop. Moving a 1 KB object from storage to queue to compute crosses three network boundaries before any work happens.
In the fabricBIOS model, storage, queuing, and compute are all leased from the same fabric memory pool. An “S3 event → SQS message → Lambda function” becomes writes to different data structures in the same address space. No network hops between services. No serialization at service boundaries. The object, the notification, and the compute result all live in leased DRAM — and all three vanish when their leases expire.
What You Build
An event pipeline where:
- Object writes go to a
MemObjectStore(leased DRAM). - Each write produces a notification on a
grafos-mqtopic (same leased DRAM pool). - Workers consume notifications, read objects from the store, process them, and write results back.
- All three layers share the same fabric memory pool — zero network hops between stages.
- Lease expiry handles cleanup: abandoned pipelines simply cease to exist.
Building Blocks
grafos_store::{MemObjectStore, ObjectStore, FabricUri}— object storage — sourcegrafos_mq::topic::{TopicManager, TopicConfig}— topic lifecycle — sourcegrafos_mq::producer::Producer— message production — sourcegrafos_mq::consumer::{Consumer, SeekPolicy}— partition-assigned polling — sourcegrafos_mq::group::ConsumerGroup— decentralized partition assignment — sourcegrafos_jobs::{JobCoordinator, RetryPolicy, MemoryOutputStore}— idempotent retry scaffolding — sourcegrafos_leasekit::{RenewalManager, RenewalPolicy}— TTL-driven lease renewal — source
Design
Why One Fabric Changes the Game
In a conventional cloud pipeline, each service boundary is a network call: the storage service serializes a notification, sends it over HTTP to the queue service, the queue service serializes the message, the compute service fetches it over HTTP, then makes another HTTP call back to storage to read the object. Three services, three serialization round-trips, three sets of credentials.
When all three live in the same leased memory pool, the “notification” is a pointer-sized write to a ring buffer that already exists in the same address space. The “fetch” is a hash map lookup. The only real work is the computation itself.
Architecture
Producer (your code) │ ├── store.put(uri, data) → MemObjectStore (FabricHashMap in leased DRAM) │ └── producer.send(uri_bytes) → TopicManager partition (ring buffer in leased DRAM) │ Consumer.poll() │ store.get(uri) → same MemObjectStore │ process(data) │ store.put(result_uri, output)No network between any of these steps. The MemObjectStore and TopicManager partitions are both backed
by FabricHashMap / FabricQueue over the same leased DRAM.
Notification as a Lightweight Pointer
The notification message is just the FabricUri string (e.g. fabric://default/uploads/img-042.png). The
consumer uses it to look up the object in the store. Since the store and queue share the same memory pool,
this “look up” is a hash map get — not a network fetch.
Walkthrough (Implementation Sketch)
1. Create the Store and Notification Topic
use grafos_store::{MemObjectStore, ObjectStore, FabricUri};use grafos_mq::topic::{TopicManager, TopicConfig};use grafos_mq::producer::Producer;
// Object store — 256 hash buckets in leased DRAMlet mut store = MemObjectStore::new(256)?;
// Notification topic — 4 partitions, each a ring bufferlet mut topics = TopicManager::new();topics.create("object-events", TopicConfig { num_partitions: 4, partition_capacity: 128, partition_stride: 512,})?;
let mut notifier = Producer::new("object-events");2. Write an Object and Notify
fn ingest( store: &mut MemObjectStore, topics: &mut TopicManager, notifier: &mut Producer, key: &str, data: &[u8],) -> grafos_std::Result<()> { let uri: FabricUri = format!("fabric://default/uploads/{}", key).parse()?;
// Write to store (hash map insert — no network) store.put(&uri, data, None)?;
// Notify (ring buffer append — no network) notifier.send(topics, uri.to_string().as_bytes())?;
Ok(())}
ingest(&mut store, &mut topics, &mut notifier, "img-042.png", &image_bytes)?;ingest(&mut store, &mut topics, &mut notifier, "img-043.png", &image_bytes)?;Both operations are writes to leased DRAM. No serialization boundary, no HTTP call.
3. Consume Notifications and Process
use grafos_mq::consumer::{Consumer, SeekPolicy};use grafos_mq::group::ConsumerGroup;use grafos_mq::offset::MemOffsetStore;
let mut group = ConsumerGroup::new("processors", "object-events", 4, 30);let claimed = group.claim("worker-1");
let mut offsets = MemOffsetStore::new();let mut consumer = Consumer::new("object-events", "processors");consumer.assign(&claimed);consumer.seek(&topics, &offsets, SeekPolicy::Earliest)?;
let messages = consumer.poll(&topics, 100)?;for (_part_idx, msg) in &messages { // Parse the URI from the notification let uri_str = core::str::from_utf8(&msg.value).unwrap(); let uri: FabricUri = uri_str.parse()?;
// Read the object — hash map lookup, not a network fetch let obj = store.get(&uri)?.expect("object exists");
// Process let result = process_image(&obj.data);
// Write result back to store let result_uri: FabricUri = format!("fabric://default/results/{}", uri.key()).parse()?; store.put(&result_uri, &result, None)?;}consumer.commit(&mut offsets);group.heartbeat("worker-1");4. Orchestrate with JobCoordinator (Optional)
For idempotent processing with retry:
use grafos_jobs::{JobCoordinator, RetryPolicy, MemoryOutputStore};
let mut output_store = MemoryOutputStore::new();let mut coord = JobCoordinator::new(RetryPolicy::default());
let result = coord.run( &chunks, &mut output_store, |chunk_bytes| { let uri: FabricUri = core::str::from_utf8(chunk_bytes) .map_err(|_| grafos_std::FabricError::IoError(-1))? .parse()?; let obj = store.get(&uri)?.ok_or(grafos_std::FabricError::IoError(-2))?; Ok(process_image(&obj.data)) }, |outputs| { // Aggregate: count processed postcard::to_allocvec(&(outputs.len() as u64)).unwrap() },)?;5. Lease Expiry Handles Cleanup
Register all backing leases with RenewalManager. When you stop calling tick(), leases expire.
The store, the topic partitions, and all results vanish. No cleanup scripts, no cron jobs.
use grafos_leasekit::{RenewalManager, RenewalPolicy};
let mut renewals = RenewalManager::new();let retention_secs: u64 = 3600;let now = grafos_std::host::unix_time_secs();
// Register store and topic partition leasesfor lease_id in all_backing_lease_ids { renewals.register(lease_id, now + retention_secs, RenewalPolicy::default());}
// In your main loop:let _summary = renewals.tick(now);Failure Modes
- Worker crash:
ConsumerGroupheartbeat goes stale, partitions rebalance to surviving workers. Messages are reprocessed from the last committed offset. - Object deleted before processing:
store.get()returnsNone. Handle as a permanent error (don’t retry — the object is gone). - Store lease expires: All objects and results disappear. Workers see empty reads. This is the
intended behavior for bounded-retention pipelines. For durable pipelines, use
RenewalManagerto keep leases alive. - Topic partition full: Ring buffer wraps — oldest unprocessed messages are overwritten. Size
partition_capacityfor your ingest rate.
Observability
store_put_bytes/store_get_bytes— object I/O volume (enableobservefeature ongrafos-store)mq_messages_published/mq_messages_consumed— pipeline throughput- Consumer group assignment map — which worker owns which partitions
- Lease TTL remaining via
RenewalManager— time until automatic cleanup - End-to-end latency: timestamp at
store.put()vs timestamp at result write
Variations
- Fan-out: One object write produces multiple notifications on different topics (e.g.
thumbnails,metadata-extraction,search-indexing). Each topic has its own consumer group. - Tiered storage: Use
TieredObjectStoreinstead ofMemObjectStore— hot objects in DRAM, cold objects spill to block storage automatically. - Dead-letter routing: Add a
DlqRouterfor messages that fail processing after N retries. The DLQ topic shares the same lease-scoped lifetime. - Multi-stage pipeline: Chain multiple topic/consumer pairs — stage 1 writes to
stage-2-events, stage 2 writes tostage-3-events. Each stage is a separate consumer group. - Durable offsets: Replace
MemOffsetStorewithgrafos_kv::FabricKvStorefor offset persistence across worker restarts.
Testing
cargo test -p grafos-store -- mem_store # object store roundtripscargo test -p grafos-mq -- topic # topic lifecyclecargo test -p grafos-mq -- producer # message productioncargo test -p grafos-mq -- consumer # polling and offset trackingcargo test -p grafos-jobs -- coordinator # retry and aggregation