grafos_mq/
offset.rs

1//! Offset storage: trait and in-memory implementation.
2
3extern crate alloc;
4use alloc::collections::BTreeMap;
5
6/// Trait for persisting consumer offsets.
7pub trait OffsetStore {
8    /// Commit the offset for a given topic/partition/group.
9    fn commit(&mut self, topic: &str, partition: u32, group: &str, offset: u64);
10
11    /// Fetch the last committed offset for a given topic/partition/group.
12    /// Returns `None` if no offset has been committed.
13    fn fetch(&self, topic: &str, partition: u32, group: &str) -> Option<u64>;
14}
15
16/// Key for the in-memory offset store.
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
18struct OffsetKey {
19    topic: alloc::string::String,
20    partition: u32,
21    group: alloc::string::String,
22}
23
24/// In-memory offset store backed by a BTreeMap.
25pub struct MemOffsetStore {
26    offsets: BTreeMap<OffsetKey, u64>,
27}
28
29impl MemOffsetStore {
30    /// Create a new empty offset store.
31    pub fn new() -> Self {
32        MemOffsetStore {
33            offsets: BTreeMap::new(),
34        }
35    }
36}
37
38impl Default for MemOffsetStore {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl OffsetStore for MemOffsetStore {
45    fn commit(&mut self, topic: &str, partition: u32, group: &str, offset: u64) {
46        let key = OffsetKey {
47            topic: alloc::string::String::from(topic),
48            partition,
49            group: alloc::string::String::from(group),
50        };
51        self.offsets.insert(key, offset);
52    }
53
54    fn fetch(&self, topic: &str, partition: u32, group: &str) -> Option<u64> {
55        let key = OffsetKey {
56            topic: alloc::string::String::from(topic),
57            partition,
58            group: alloc::string::String::from(group),
59        };
60        self.offsets.get(&key).copied()
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67
68    #[test]
69    fn commit_and_fetch() {
70        let mut store = MemOffsetStore::new();
71        assert_eq!(store.fetch("orders", 0, "group-a"), None);
72
73        store.commit("orders", 0, "group-a", 42);
74        assert_eq!(store.fetch("orders", 0, "group-a"), Some(42));
75
76        // Different partition
77        assert_eq!(store.fetch("orders", 1, "group-a"), None);
78
79        // Different group
80        assert_eq!(store.fetch("orders", 0, "group-b"), None);
81
82        // Update
83        store.commit("orders", 0, "group-a", 100);
84        assert_eq!(store.fetch("orders", 0, "group-a"), Some(100));
85    }
86}