1extern crate alloc;
15use alloc::vec;
16
17use crate::message::Message;
18use grafos_std::error::{FabricError, Result};
19use grafos_std::host;
20use grafos_std::mem::{MemBuilder, MemLease};
21
22const HEADER_SIZE: u64 = 32;
23
24pub struct Partition {
26 lease: MemLease,
27 write_offset: u64,
29 capacity: u64,
31 stride: u64,
33 msg_count: u64,
35}
36
37impl Partition {
38 pub fn new(capacity: usize, stride: usize) -> Result<Self> {
42 let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
43 let lease = MemBuilder::new().min_bytes(needed).acquire()?;
44 let p = Partition {
45 lease,
46 write_offset: 0,
47 capacity: capacity as u64,
48 stride: stride as u64,
49 msg_count: 0,
50 };
51 p.write_header()?;
52 Ok(p)
53 }
54
55 pub fn append(&mut self, key: Option<&[u8]>, value: &[u8]) -> Result<u64> {
61 let now = host::unix_time_secs();
62 let offset = self.msg_count;
63 let msg = Message {
64 offset,
65 timestamp: now,
66 key: key.map(|k| k.to_vec()),
67 value: value.to_vec(),
68 headers: alloc::vec::Vec::new(),
69 };
70
71 let bytes = postcard::to_allocvec(&msg).map_err(|_| FabricError::IoError(-1))?;
72 if bytes.len() as u64 + 4 > self.stride {
73 return Err(FabricError::CapacityExceeded);
74 }
75
76 let slot_idx = self.write_offset % self.capacity;
77 let slot_offset = HEADER_SIZE + slot_idx * self.stride;
78
79 let mut slot = vec![0u8; self.stride as usize];
80 let len_bytes = (bytes.len() as u32).to_le_bytes();
81 slot[..4].copy_from_slice(&len_bytes);
82 slot[4..4 + bytes.len()].copy_from_slice(&bytes);
83
84 self.lease.mem().write(slot_offset, &slot)?;
85 self.write_offset += 1;
86 self.msg_count += 1;
87 self.write_header()?;
88
89 Ok(offset)
90 }
91
92 pub fn read_at(&self, offset: u64) -> Result<Option<Message>> {
97 if offset >= self.msg_count {
98 return Ok(None);
99 }
100 let oldest_available = self.msg_count.saturating_sub(self.capacity);
102 if offset < oldest_available {
103 return Ok(None);
104 }
105
106 let slot_idx = offset % self.capacity;
107 let slot_offset = HEADER_SIZE + slot_idx * self.stride;
108
109 let raw = self.lease.mem().read(slot_offset, self.stride as u32)?;
110 let ser_len = u32::from_le_bytes([raw[0], raw[1], raw[2], raw[3]]) as usize;
111 if ser_len == 0 || ser_len + 4 > raw.len() {
112 return Ok(None);
113 }
114 let msg: Message =
115 postcard::from_bytes(&raw[4..4 + ser_len]).map_err(|_| FabricError::IoError(-1))?;
116 Ok(Some(msg))
117 }
118
119 pub fn next_offset(&self) -> u64 {
121 self.msg_count
122 }
123
124 pub fn oldest_offset(&self) -> u64 {
126 self.msg_count.saturating_sub(self.capacity)
127 }
128
129 pub fn len(&self) -> usize {
131 let available = self.msg_count - self.oldest_offset();
132 available as usize
133 }
134
135 pub fn is_empty(&self) -> bool {
137 self.msg_count == 0
138 }
139
140 pub fn capacity(&self) -> usize {
142 self.capacity as usize
143 }
144
145 fn write_header(&self) -> Result<()> {
146 let mut hdr = [0u8; HEADER_SIZE as usize];
147 hdr[0..8].copy_from_slice(&self.write_offset.to_le_bytes());
148 hdr[8..16].copy_from_slice(&self.capacity.to_le_bytes());
149 hdr[16..24].copy_from_slice(&self.stride.to_le_bytes());
150 hdr[24..32].copy_from_slice(&self.msg_count.to_le_bytes());
151 self.lease.mem().write(0, &hdr)
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use grafos_std::host;
159
160 fn setup() {
161 host::reset_mock();
162 host::mock_set_fbmu_arena_size(65536);
163 }
164
165 #[test]
166 fn append_and_read() {
167 setup();
168 let mut part = Partition::new(16, 256).expect("new");
169 assert!(part.is_empty());
170
171 let off = part.append(None, b"hello").expect("append");
172 assert_eq!(off, 0);
173 assert_eq!(part.len(), 1);
174
175 let msg = part.read_at(0).expect("read").expect("some");
176 assert_eq!(msg.offset, 0);
177 assert_eq!(msg.value, b"hello");
178 assert_eq!(msg.key, None);
179 }
180
181 #[test]
182 fn keyed_append() {
183 setup();
184 let mut part = Partition::new(16, 256).expect("new");
185 part.append(Some(b"k1"), b"v1").expect("append");
186
187 let msg = part.read_at(0).expect("read").expect("some");
188 assert_eq!(msg.key, Some(b"k1".to_vec()));
189 assert_eq!(msg.value, b"v1");
190 }
191
192 #[test]
193 fn wraparound() {
194 setup();
195 let mut part = Partition::new(4, 256).expect("new");
196
197 for i in 0..6u64 {
198 part.append(None, &[i as u8; 4]).expect("append");
199 }
200
201 assert_eq!(part.oldest_offset(), 2);
203 assert!(part.read_at(0).expect("read").is_none());
204 assert!(part.read_at(1).expect("read").is_none());
205
206 for i in 2..6u64 {
208 let msg = part.read_at(i).expect("read").expect("some");
209 assert_eq!(msg.offset, i);
210 }
211 }
212
213 #[test]
214 fn read_beyond_write() {
215 setup();
216 let mut part = Partition::new(8, 256).expect("new");
217 part.append(None, b"one").expect("append");
218 assert!(part.read_at(1).expect("read").is_none());
219 assert!(part.read_at(100).expect("read").is_none());
220 }
221
222 #[test]
223 fn capacity_exceeded_for_large_message() {
224 setup();
225 let mut part = Partition::new(4, 32).expect("new");
226 let big = vec![0u8; 200];
228 let result = part.append(None, &big);
229 assert_eq!(result.unwrap_err(), FabricError::CapacityExceeded);
230 }
231}