1extern crate alloc;
9use alloc::collections::BTreeMap;
10use alloc::string::String;
11use alloc::vec::Vec;
12
13use grafos_std::host;
14
15#[derive(Clone, Debug)]
17struct Assignment {
18 consumer_id: String,
19 last_heartbeat: u64,
20}
21
22pub struct ConsumerGroup {
24 group_name: String,
25 topic_name: String,
26 num_partitions: u32,
27 assignments: BTreeMap<u32, Assignment>,
29 stale_threshold_secs: u64,
31}
32
33impl ConsumerGroup {
34 pub fn new(
36 group_name: &str,
37 topic_name: &str,
38 num_partitions: u32,
39 stale_threshold_secs: u64,
40 ) -> Self {
41 ConsumerGroup {
42 group_name: String::from(group_name),
43 topic_name: String::from(topic_name),
44 num_partitions,
45 assignments: BTreeMap::new(),
46 stale_threshold_secs,
47 }
48 }
49
50 pub fn claim(&mut self, consumer_id: &str) -> Vec<u32> {
54 let now = host::unix_time_secs();
55 let mut claimed = Vec::new();
56
57 for part_idx in 0..self.num_partitions {
58 let should_claim = match self.assignments.get(&part_idx) {
59 None => true,
60 Some(a) => {
61 if a.consumer_id == consumer_id {
62 true
63 } else {
64 now.saturating_sub(a.last_heartbeat) > self.stale_threshold_secs
65 }
66 }
67 };
68
69 if should_claim {
70 self.assignments.insert(
71 part_idx,
72 Assignment {
73 consumer_id: String::from(consumer_id),
74 last_heartbeat: now,
75 },
76 );
77 claimed.push(part_idx);
78 }
79 }
80
81 claimed
82 }
83
84 pub fn assigned_to(&self, consumer_id: &str) -> Vec<u32> {
86 self.assignments
87 .iter()
88 .filter(|(_, a)| a.consumer_id == consumer_id)
89 .map(|(&idx, _)| idx)
90 .collect()
91 }
92
93 pub fn heartbeat(&mut self, consumer_id: &str) {
95 let now = host::unix_time_secs();
96 for a in self.assignments.values_mut() {
97 if a.consumer_id == consumer_id {
98 a.last_heartbeat = now;
99 }
100 }
101 }
102
103 pub fn release(&mut self, consumer_id: &str) {
105 self.assignments.retain(|_, a| a.consumer_id != consumer_id);
106 }
107
108 pub fn group_name(&self) -> &str {
110 &self.group_name
111 }
112
113 pub fn topic_name(&self) -> &str {
115 &self.topic_name
116 }
117
118 pub fn unassigned_count(&self) -> u32 {
120 let assigned = self.assignments.len() as u32;
121 self.num_partitions.saturating_sub(assigned)
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use grafos_std::host;
129
130 fn setup() {
131 host::reset_mock();
132 host::mock_set_unix_time_secs(1000);
133 }
134
135 #[test]
136 fn single_consumer_claims_all() {
137 setup();
138 let mut group = ConsumerGroup::new("g1", "orders", 4, 30);
139 let claimed = group.claim("c1");
140 assert_eq!(claimed, vec![0, 1, 2, 3]);
141 assert_eq!(group.assigned_to("c1"), vec![0, 1, 2, 3]);
142 assert_eq!(group.unassigned_count(), 0);
143 }
144
145 #[test]
146 fn second_consumer_waits_for_stale() {
147 setup();
148 let mut group = ConsumerGroup::new("g1", "orders", 4, 30);
149
150 group.claim("c1");
152
153 let claimed = group.claim("c2");
155 assert!(claimed.is_empty());
156
157 host::mock_advance_time_secs(31);
159
160 let claimed = group.claim("c2");
162 assert_eq!(claimed, vec![0, 1, 2, 3]);
163 assert_eq!(group.assigned_to("c2"), vec![0, 1, 2, 3]);
164 assert!(group.assigned_to("c1").is_empty());
165 }
166
167 #[test]
168 fn heartbeat_prevents_stale_takeover() {
169 setup();
170 let mut group = ConsumerGroup::new("g1", "orders", 4, 30);
171 group.claim("c1");
172
173 host::mock_advance_time_secs(20);
175 group.heartbeat("c1");
176
177 host::mock_advance_time_secs(20);
179
180 let claimed = group.claim("c2");
182 assert!(claimed.is_empty());
183 }
184
185 #[test]
186 fn release_frees_partitions() {
187 setup();
188 let mut group = ConsumerGroup::new("g1", "orders", 4, 30);
189 group.claim("c1");
190
191 group.release("c1");
192 assert_eq!(group.unassigned_count(), 4);
193
194 let claimed = group.claim("c2");
196 assert_eq!(claimed, vec![0, 1, 2, 3]);
197 }
198}