1extern crate alloc;
4use alloc::string::String;
5use alloc::vec::Vec;
6
7use crate::message::Message;
8use crate::offset::OffsetStore;
9use crate::topic::TopicManager;
10use grafos_std::error::{FabricError, Result};
11
12#[derive(Clone, Debug, PartialEq, Eq)]
14pub enum SeekPolicy {
15 Earliest,
17 Latest,
19 Committed,
21}
22
23pub struct Consumer {
25 topic_name: String,
26 group: String,
27 assigned_partitions: Vec<u32>,
29 read_offsets: Vec<u64>,
31 initialized: bool,
33}
34
35impl Consumer {
36 pub fn new(topic_name: &str, group: &str) -> Self {
38 Consumer {
39 topic_name: String::from(topic_name),
40 group: String::from(group),
41 assigned_partitions: Vec::new(),
42 read_offsets: Vec::new(),
43 initialized: false,
44 }
45 }
46
47 pub fn assign(&mut self, partitions: &[u32]) {
49 self.assigned_partitions = partitions.to_vec();
50 self.read_offsets = alloc::vec![0u64; partitions.len()];
51 self.initialized = false;
52 }
53
54 pub fn seek(
56 &mut self,
57 mgr: &TopicManager,
58 offset_store: &dyn OffsetStore,
59 policy: SeekPolicy,
60 ) -> Result<()> {
61 let topic = mgr
62 .open(&self.topic_name)
63 .ok_or(FabricError::IoError(-11))?;
64
65 for (i, &part_idx) in self.assigned_partitions.iter().enumerate() {
66 let partition = topic.partition(part_idx).ok_or(FabricError::IoError(-12))?;
67 let offset = match &policy {
68 SeekPolicy::Earliest => partition.oldest_offset(),
69 SeekPolicy::Latest => partition.next_offset(),
70 SeekPolicy::Committed => {
71 if let Some(committed) =
72 offset_store.fetch(&self.topic_name, part_idx, &self.group)
73 {
74 committed + 1
76 } else {
77 partition.oldest_offset()
78 }
79 }
80 };
81 self.read_offsets[i] = offset;
82 }
83 self.initialized = true;
84 Ok(())
85 }
86
87 pub fn poll(&mut self, mgr: &TopicManager, max_messages: usize) -> Result<Vec<(u32, Message)>> {
92 if !self.initialized {
93 return Err(FabricError::IoError(-13));
94 }
95
96 let topic = mgr
97 .open(&self.topic_name)
98 .ok_or(FabricError::IoError(-11))?;
99 let mut results = Vec::new();
100
101 for (i, &part_idx) in self.assigned_partitions.iter().enumerate() {
102 if results.len() >= max_messages {
103 break;
104 }
105 let partition = topic.partition(part_idx).ok_or(FabricError::IoError(-12))?;
106 while results.len() < max_messages && self.read_offsets[i] < partition.next_offset() {
107 if let Some(msg) = partition.read_at(self.read_offsets[i])? {
108 #[cfg(feature = "observe")]
109 crate::observe_hooks::on_message_consumed(&self.topic_name, &self.group);
110 results.push((part_idx, msg));
111 }
112 self.read_offsets[i] += 1;
113 }
114 }
115
116 Ok(results)
117 }
118
119 pub fn commit(&self, offset_store: &mut dyn OffsetStore) {
121 for (i, &part_idx) in self.assigned_partitions.iter().enumerate() {
122 if self.read_offsets[i] > 0 {
123 offset_store.commit(
124 &self.topic_name,
125 part_idx,
126 &self.group,
127 self.read_offsets[i] - 1,
128 );
129 }
130 }
131 }
132
133 pub fn assigned_partitions(&self) -> &[u32] {
135 &self.assigned_partitions
136 }
137
138 pub fn group(&self) -> &str {
140 &self.group
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use crate::offset::MemOffsetStore;
148 use crate::producer::Producer;
149 use crate::topic::{TopicConfig, TopicManager};
150 use grafos_std::host;
151
152 fn setup() -> TopicManager {
153 host::reset_mock();
154 host::mock_set_fbmu_arena_size(1 << 20);
155 let mut mgr = TopicManager::new();
156 mgr.create(
157 "test",
158 TopicConfig {
159 num_partitions: 2,
160 partition_capacity: 32,
161 partition_stride: 256,
162 },
163 )
164 .expect("create");
165 mgr
166 }
167
168 #[test]
169 fn produce_consume_roundtrip() {
170 let mut mgr = setup();
171 let mut store = MemOffsetStore::new();
172
173 let mut prod = Producer::new("test");
175 prod.send_to(&mut mgr, 0, None, b"msg0").expect("send");
176 prod.send_to(&mut mgr, 0, None, b"msg1").expect("send");
177 prod.send_to(&mut mgr, 1, None, b"msg2").expect("send");
178
179 let mut consumer = Consumer::new("test", "group-a");
181 consumer.assign(&[0, 1]);
182 consumer
183 .seek(&mgr, &store, SeekPolicy::Earliest)
184 .expect("seek");
185
186 let msgs = consumer.poll(&mgr, 100).expect("poll");
187 assert_eq!(msgs.len(), 3);
188 assert_eq!(msgs[0].1.value, b"msg0");
189 assert_eq!(msgs[1].1.value, b"msg1");
190 assert_eq!(msgs[2].1.value, b"msg2");
191
192 consumer.commit(&mut store);
194 assert_eq!(store.fetch("test", 0, "group-a"), Some(1));
195 assert_eq!(store.fetch("test", 1, "group-a"), Some(0));
196 }
197
198 #[test]
199 fn seek_committed_resumes() {
200 let mut mgr = setup();
201 let mut store = MemOffsetStore::new();
202
203 let mut prod = Producer::new("test");
204 prod.send_to(&mut mgr, 0, None, b"a").expect("send");
205 prod.send_to(&mut mgr, 0, None, b"b").expect("send");
206 prod.send_to(&mut mgr, 0, None, b"c").expect("send");
207
208 let mut c1 = Consumer::new("test", "g1");
210 c1.assign(&[0]);
211 c1.seek(&mgr, &store, SeekPolicy::Earliest).expect("seek");
212 let msgs = c1.poll(&mgr, 2).expect("poll");
213 assert_eq!(msgs.len(), 2);
214 c1.commit(&mut store);
215
216 let mut c2 = Consumer::new("test", "g1");
218 c2.assign(&[0]);
219 c2.seek(&mgr, &store, SeekPolicy::Committed).expect("seek");
220 let msgs = c2.poll(&mgr, 100).expect("poll");
221 assert_eq!(msgs.len(), 1);
222 assert_eq!(msgs[0].1.value, b"c");
223 }
224
225 #[test]
226 fn seek_latest_skips_existing() {
227 let mut mgr = setup();
228 let store = MemOffsetStore::new();
229
230 let mut prod = Producer::new("test");
231 prod.send_to(&mut mgr, 0, None, b"old").expect("send");
232
233 let mut consumer = Consumer::new("test", "g1");
234 consumer.assign(&[0]);
235 consumer
236 .seek(&mgr, &store, SeekPolicy::Latest)
237 .expect("seek");
238
239 let msgs = consumer.poll(&mgr, 100).expect("poll");
241 assert!(msgs.is_empty());
242
243 let mut prod2 = Producer::new("test");
245 prod2.send_to(&mut mgr, 0, None, b"new").expect("send");
246 let msgs = consumer.poll(&mgr, 100).expect("poll");
247 assert_eq!(msgs.len(), 1);
248 assert_eq!(msgs[0].1.value, b"new");
249 }
250
251 #[test]
252 fn poll_without_seek_fails() {
253 let mgr = setup();
254 let mut consumer = Consumer::new("test", "g1");
255 consumer.assign(&[0]);
256 let result = consumer.poll(&mgr, 100);
257 assert!(result.is_err());
258 }
259}