1extern crate alloc;
4use alloc::collections::BTreeMap;
5use alloc::string::String;
6use alloc::vec::Vec;
7
8use crate::partition::Partition;
9use grafos_std::error::{FabricError, Result};
10
11#[derive(Clone, Debug)]
13pub struct TopicConfig {
14 pub num_partitions: u32,
16 pub partition_capacity: usize,
18 pub partition_stride: usize,
20}
21
22impl Default for TopicConfig {
23 fn default() -> Self {
24 TopicConfig {
25 num_partitions: 4,
26 partition_capacity: 64,
27 partition_stride: 512,
28 }
29 }
30}
31
32pub struct Topic {
34 name: String,
35 partitions: Vec<Partition>,
36}
37
38impl Topic {
39 pub(crate) fn new(name: &str, config: &TopicConfig) -> Result<Self> {
41 let mut partitions = Vec::with_capacity(config.num_partitions as usize);
42 for _ in 0..config.num_partitions {
43 partitions.push(Partition::new(
44 config.partition_capacity,
45 config.partition_stride,
46 )?);
47 }
48 Ok(Topic {
49 name: String::from(name),
50 partitions,
51 })
52 }
53
54 pub fn name(&self) -> &str {
56 &self.name
57 }
58
59 pub fn num_partitions(&self) -> u32 {
61 self.partitions.len() as u32
62 }
63
64 pub fn partition(&self, idx: u32) -> Option<&Partition> {
66 self.partitions.get(idx as usize)
67 }
68
69 pub fn partition_mut(&mut self, idx: u32) -> Option<&mut Partition> {
71 self.partitions.get_mut(idx as usize)
72 }
73}
74
75pub struct TopicManager {
77 topics: BTreeMap<String, Topic>,
78}
79
80impl TopicManager {
81 pub fn new() -> Self {
83 TopicManager {
84 topics: BTreeMap::new(),
85 }
86 }
87
88 pub fn create(&mut self, name: &str, config: TopicConfig) -> Result<()> {
92 if self.topics.contains_key(name) {
93 return Err(FabricError::IoError(-10));
94 }
95 let topic = Topic::new(name, &config)?;
96 self.topics.insert(String::from(name), topic);
97 #[cfg(feature = "observe")]
98 crate::observe_hooks::on_topic_created(name);
99 Ok(())
100 }
101
102 pub fn open(&self, name: &str) -> Option<&Topic> {
104 self.topics.get(name)
105 }
106
107 pub fn open_mut(&mut self, name: &str) -> Option<&mut Topic> {
109 self.topics.get_mut(name)
110 }
111
112 pub fn delete(&mut self, name: &str) -> bool {
114 self.topics.remove(name).is_some()
115 }
116
117 pub fn list(&self) -> Vec<String> {
119 self.topics.keys().cloned().collect()
120 }
121
122 pub fn len(&self) -> usize {
124 self.topics.len()
125 }
126
127 pub fn is_empty(&self) -> bool {
129 self.topics.is_empty()
130 }
131}
132
133impl Default for TopicManager {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use grafos_std::host;
143
144 fn setup() {
145 host::reset_mock();
146 host::mock_set_fbmu_arena_size(1 << 20); }
148
149 #[test]
150 fn create_open_delete() {
151 setup();
152 let mut mgr = TopicManager::new();
153 assert!(mgr.is_empty());
154
155 mgr.create("orders", TopicConfig::default())
156 .expect("create");
157 assert_eq!(mgr.len(), 1);
158 assert_eq!(mgr.list(), vec!["orders"]);
159
160 let topic = mgr.open("orders").expect("open");
161 assert_eq!(topic.name(), "orders");
162 assert_eq!(topic.num_partitions(), 4);
163
164 assert!(mgr.delete("orders"));
165 assert!(mgr.open("orders").is_none());
166 assert!(!mgr.delete("orders"));
167 }
168
169 #[test]
170 fn duplicate_create_fails() {
171 setup();
172 let mut mgr = TopicManager::new();
173 mgr.create("t1", TopicConfig::default()).expect("create");
174 let result = mgr.create("t1", TopicConfig::default());
175 assert!(result.is_err());
176 }
177
178 #[test]
179 fn custom_partition_count() {
180 setup();
181 let mut mgr = TopicManager::new();
182 let config = TopicConfig {
183 num_partitions: 8,
184 partition_capacity: 32,
185 partition_stride: 256,
186 };
187 mgr.create("events", config).expect("create");
188 let topic = mgr.open("events").expect("open");
189 assert_eq!(topic.num_partitions(), 8);
190 }
191}