grafos_mq/
consumer.rs

1//! Consumer: poll messages from assigned partitions, commit offsets, seek.
2
3extern crate alloc;
4use alloc::string::String;
5use alloc::vec::Vec;
6
7use crate::message::Message;
8use crate::offset::OffsetStore;
9use crate::topic::TopicManager;
10use grafos_std::error::{FabricError, Result};
11
12/// Seek policy for initializing consumer position.
13#[derive(Clone, Debug, PartialEq, Eq)]
14pub enum SeekPolicy {
15    /// Start from the earliest available message.
16    Earliest,
17    /// Start from the latest message (only new messages).
18    Latest,
19    /// Start from the last committed offset (or earliest if none).
20    Committed,
21}
22
23/// A consumer that reads from specific partitions of a topic.
24pub struct Consumer {
25    topic_name: String,
26    group: String,
27    /// Assigned partition indices.
28    assigned_partitions: Vec<u32>,
29    /// Current read offset per partition (by partition index in assigned_partitions).
30    read_offsets: Vec<u64>,
31    /// Whether offsets have been initialized.
32    initialized: bool,
33}
34
35impl Consumer {
36    /// Create a new consumer for the named topic and consumer group.
37    pub fn new(topic_name: &str, group: &str) -> Self {
38        Consumer {
39            topic_name: String::from(topic_name),
40            group: String::from(group),
41            assigned_partitions: Vec::new(),
42            read_offsets: Vec::new(),
43            initialized: false,
44        }
45    }
46
47    /// Assign specific partitions to this consumer.
48    pub fn assign(&mut self, partitions: &[u32]) {
49        self.assigned_partitions = partitions.to_vec();
50        self.read_offsets = alloc::vec![0u64; partitions.len()];
51        self.initialized = false;
52    }
53
54    /// Initialize read offsets according to the seek policy.
55    pub fn seek(
56        &mut self,
57        mgr: &TopicManager,
58        offset_store: &dyn OffsetStore,
59        policy: SeekPolicy,
60    ) -> Result<()> {
61        let topic = mgr
62            .open(&self.topic_name)
63            .ok_or(FabricError::IoError(-11))?;
64
65        for (i, &part_idx) in self.assigned_partitions.iter().enumerate() {
66            let partition = topic.partition(part_idx).ok_or(FabricError::IoError(-12))?;
67            let offset = match &policy {
68                SeekPolicy::Earliest => partition.oldest_offset(),
69                SeekPolicy::Latest => partition.next_offset(),
70                SeekPolicy::Committed => {
71                    if let Some(committed) =
72                        offset_store.fetch(&self.topic_name, part_idx, &self.group)
73                    {
74                        // Resume from after the committed offset
75                        committed + 1
76                    } else {
77                        partition.oldest_offset()
78                    }
79                }
80            };
81            self.read_offsets[i] = offset;
82        }
83        self.initialized = true;
84        Ok(())
85    }
86
87    /// Poll for the next batch of messages across all assigned partitions.
88    ///
89    /// Returns up to `max_messages` messages. Messages are interleaved across
90    /// partitions in round-robin order.
91    pub fn poll(&mut self, mgr: &TopicManager, max_messages: usize) -> Result<Vec<(u32, Message)>> {
92        if !self.initialized {
93            return Err(FabricError::IoError(-13));
94        }
95
96        let topic = mgr
97            .open(&self.topic_name)
98            .ok_or(FabricError::IoError(-11))?;
99        let mut results = Vec::new();
100
101        for (i, &part_idx) in self.assigned_partitions.iter().enumerate() {
102            if results.len() >= max_messages {
103                break;
104            }
105            let partition = topic.partition(part_idx).ok_or(FabricError::IoError(-12))?;
106            while results.len() < max_messages && self.read_offsets[i] < partition.next_offset() {
107                if let Some(msg) = partition.read_at(self.read_offsets[i])? {
108                    #[cfg(feature = "observe")]
109                    crate::observe_hooks::on_message_consumed(&self.topic_name, &self.group);
110                    results.push((part_idx, msg));
111                }
112                self.read_offsets[i] += 1;
113            }
114        }
115
116        Ok(results)
117    }
118
119    /// Commit the current read offsets to the offset store.
120    pub fn commit(&self, offset_store: &mut dyn OffsetStore) {
121        for (i, &part_idx) in self.assigned_partitions.iter().enumerate() {
122            if self.read_offsets[i] > 0 {
123                offset_store.commit(
124                    &self.topic_name,
125                    part_idx,
126                    &self.group,
127                    self.read_offsets[i] - 1,
128                );
129            }
130        }
131    }
132
133    /// Returns the assigned partitions.
134    pub fn assigned_partitions(&self) -> &[u32] {
135        &self.assigned_partitions
136    }
137
138    /// Returns the consumer group name.
139    pub fn group(&self) -> &str {
140        &self.group
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use crate::offset::MemOffsetStore;
148    use crate::producer::Producer;
149    use crate::topic::{TopicConfig, TopicManager};
150    use grafos_std::host;
151
152    fn setup() -> TopicManager {
153        host::reset_mock();
154        host::mock_set_fbmu_arena_size(1 << 20);
155        let mut mgr = TopicManager::new();
156        mgr.create(
157            "test",
158            TopicConfig {
159                num_partitions: 2,
160                partition_capacity: 32,
161                partition_stride: 256,
162            },
163        )
164        .expect("create");
165        mgr
166    }
167
168    #[test]
169    fn produce_consume_roundtrip() {
170        let mut mgr = setup();
171        let mut store = MemOffsetStore::new();
172
173        // Produce to partition 0
174        let mut prod = Producer::new("test");
175        prod.send_to(&mut mgr, 0, None, b"msg0").expect("send");
176        prod.send_to(&mut mgr, 0, None, b"msg1").expect("send");
177        prod.send_to(&mut mgr, 1, None, b"msg2").expect("send");
178
179        // Consume
180        let mut consumer = Consumer::new("test", "group-a");
181        consumer.assign(&[0, 1]);
182        consumer
183            .seek(&mgr, &store, SeekPolicy::Earliest)
184            .expect("seek");
185
186        let msgs = consumer.poll(&mgr, 100).expect("poll");
187        assert_eq!(msgs.len(), 3);
188        assert_eq!(msgs[0].1.value, b"msg0");
189        assert_eq!(msgs[1].1.value, b"msg1");
190        assert_eq!(msgs[2].1.value, b"msg2");
191
192        // Commit
193        consumer.commit(&mut store);
194        assert_eq!(store.fetch("test", 0, "group-a"), Some(1));
195        assert_eq!(store.fetch("test", 1, "group-a"), Some(0));
196    }
197
198    #[test]
199    fn seek_committed_resumes() {
200        let mut mgr = setup();
201        let mut store = MemOffsetStore::new();
202
203        let mut prod = Producer::new("test");
204        prod.send_to(&mut mgr, 0, None, b"a").expect("send");
205        prod.send_to(&mut mgr, 0, None, b"b").expect("send");
206        prod.send_to(&mut mgr, 0, None, b"c").expect("send");
207
208        // First consumer reads and commits
209        let mut c1 = Consumer::new("test", "g1");
210        c1.assign(&[0]);
211        c1.seek(&mgr, &store, SeekPolicy::Earliest).expect("seek");
212        let msgs = c1.poll(&mgr, 2).expect("poll");
213        assert_eq!(msgs.len(), 2);
214        c1.commit(&mut store);
215
216        // Second consumer with same group resumes from committed offset
217        let mut c2 = Consumer::new("test", "g1");
218        c2.assign(&[0]);
219        c2.seek(&mgr, &store, SeekPolicy::Committed).expect("seek");
220        let msgs = c2.poll(&mgr, 100).expect("poll");
221        assert_eq!(msgs.len(), 1);
222        assert_eq!(msgs[0].1.value, b"c");
223    }
224
225    #[test]
226    fn seek_latest_skips_existing() {
227        let mut mgr = setup();
228        let store = MemOffsetStore::new();
229
230        let mut prod = Producer::new("test");
231        prod.send_to(&mut mgr, 0, None, b"old").expect("send");
232
233        let mut consumer = Consumer::new("test", "g1");
234        consumer.assign(&[0]);
235        consumer
236            .seek(&mgr, &store, SeekPolicy::Latest)
237            .expect("seek");
238
239        // No messages should be available (already past existing)
240        let msgs = consumer.poll(&mgr, 100).expect("poll");
241        assert!(msgs.is_empty());
242
243        // New messages should be visible
244        let mut prod2 = Producer::new("test");
245        prod2.send_to(&mut mgr, 0, None, b"new").expect("send");
246        let msgs = consumer.poll(&mgr, 100).expect("poll");
247        assert_eq!(msgs.len(), 1);
248        assert_eq!(msgs[0].1.value, b"new");
249    }
250
251    #[test]
252    fn poll_without_seek_fails() {
253        let mgr = setup();
254        let mut consumer = Consumer::new("test", "g1");
255        consumer.assign(&[0]);
256        let result = consumer.poll(&mgr, 100);
257        assert!(result.is_err());
258    }
259}