Recipe 21: A Message Bus That Forgets on a Schedule
Situation
You need a message bus for event-driven microservices, but you also want data retention to be bounded and automatic. Traditional message brokers require manual topic cleanup, and retention policies are config knobs that ops teams forget to set.
The lease model gives you a natural tool: partition storage is leased fabric memory. When the retention window closes, the memory returns to the fabric automatically. No cron jobs, no retention scripts, no orphaned topics.
What You Build
A topic-partitioned message bus using grafos-mq where each partition is a ring buffer in leased memory.
Partition TTL equals the retention period. Dead-letter queues use the same temporal scoping. When consumers stop
and topics are abandoned, partition leases expire and memory returns.
Building Blocks
grafos_mq::topic::{TopicManager, TopicConfig}— topic lifecycle — grafos-mq sourcegrafos_mq::producer::Producer— message production with round-robin or key-hash partitioning — sourcegrafos_mq::consumer::{Consumer, SeekPolicy}— partition-assigned polling — sourcegrafos_mq::group::ConsumerGroup— decentralized partition assignment with lease-based liveness — sourcegrafos_mq::dlq::DlqRouter— dead-letter routing after max retries — sourcegrafos_mq::offset::MemOffsetStore— in-memory offset tracking — sourcegrafos_leasekit::{RenewalManager, RenewalPolicy}— TTL-driven lease renewal — source
Design
Topic Lifecycle
A topic is a collection of partitions. Each partition is a ring buffer backed by a MemLease. The
TopicConfig controls partition count, slot capacity, and stride:
let config = TopicConfig { num_partitions: 4, partition_capacity: 64, partition_stride: 512,};mgr.create("orders", config)?;Retention = Lease TTL
Register each partition’s backing lease with RenewalManager. Set the TTL to the retention period. When you
stop renewing, the partition’s memory returns to the fabric. No separate cleanup path.
let mut renewals = RenewalManager::new();let retention_secs = 3600; // 1 hour retentionlet now = unix_time_secs();for part_id in 0..config.num_partitions { renewals.register(part_id as u64, now + retention_secs as u64, RenewalPolicy::default());}Consumer Groups
ConsumerGroup provides decentralized partition assignment. Each consumer calls claim() to acquire unclaimed
or stale partitions. Liveness is tracked via heartbeat timestamps:
let mut group = ConsumerGroup::new("processors", "orders", 4, 30);let partitions = group.claim("consumer-1");If a consumer stops heartbeating, its partitions become claimable after stale_threshold_secs.
Dead-Letter Routing
DlqRouter tracks per-message nack counts. After max_retries failed processing attempts, the message routes
to a DLQ topic. The DLQ topic has the same lease-scoped lifetime as the source topic.
Walkthrough (Implementation Sketch)
1. Create Topics with Retention
use grafos_mq::topic::{TopicManager, TopicConfig};use grafos_leasekit::{RenewalManager, RenewalPolicy};
let mut mgr = TopicManager::new();let config = TopicConfig { num_partitions: 4, partition_capacity: 64, partition_stride: 512,};mgr.create("events", config.clone())?;mgr.create("events-dlq", config)?;
let mut renewals = RenewalManager::new();let retention_secs: u64 = 3600;let now = unix_time_secs();for i in 0..4 { renewals.register(i, now + retention_secs, RenewalPolicy::default());}2. Produce Messages
use grafos_mq::producer::{Producer, Partitioner};
let mut prod = Producer::new("events").with_partitioner(Partitioner::KeyHash);prod.send_keyed(&mut mgr, b"user-42", b"login")?;prod.send_keyed(&mut mgr, b"user-42", b"purchase")?;prod.send_keyed(&mut mgr, b"user-99", b"signup")?;Key-hash partitioning ensures all events for the same user land on the same partition, preserving order.
3. Consume with Group Coordination
use grafos_mq::consumer::{Consumer, SeekPolicy};use grafos_mq::group::ConsumerGroup;use grafos_mq::offset::MemOffsetStore;
let mut group = ConsumerGroup::new("processors", "events", 4, 30);let claimed = group.claim("worker-1");
let mut store = MemOffsetStore::new();let mut consumer = Consumer::new("events", "processors");consumer.assign(&claimed);consumer.seek(&mgr, &store, SeekPolicy::Committed)?;
let messages = consumer.poll(&mgr, 100)?;for (part_idx, msg) in &messages { process_event(&msg.value)?;}consumer.commit(&mut store);group.heartbeat("worker-1");4. Route Poison Messages to DLQ
use grafos_mq::dlq::DlqRouter;
let mut dlq = DlqRouter::new("events", "events-dlq", 3);
for (part_idx, msg) in &messages { if let Err(_) = process_event(&msg.value) { if dlq.nack(*part_idx, msg.offset) { dlq.route_to_dlq(&mut mgr, *part_idx, msg.offset, &msg.value)?; } }}5. Abandonment and Expiry
When all consumers disconnect and RenewalManager::tick() is no longer called, partition leases expire.
The fabric reclaims the memory. No cleanup code runs. The topic simply ceases to exist.
Failure Modes
- Partition lease expires while consumers are active: consumers see empty reads. Restart the topic with
fresh leases and seek to
Earliest. - Consumer stalls:
ConsumerGroupheartbeat goes stale, partitions rebalance to other consumers. - DLQ fills up: DLQ is itself a ring buffer; oldest dead letters are overwritten. Size the DLQ partition capacity for your expected poison message rate.
Observability
- Partition lease TTL remaining (from
RenewalManager) - Consumer group assignment map and stale consumer count
- DLQ nack counts per partition
- Messages produced/consumed per topic (enable
observefeature)
Variations
- Use
grafos_kv::FabricKvStorefor durable offset persistence across restarts - Tiered retention: short TTL for hot topics, longer TTL for audit topics
- Fan-out: multiple consumer groups on the same topic, each with independent offsets
- Exactly-once semantics via the
exactly-oncefeature flag (epoch fencing)