grafos_collections/queue.rs
1//! A bounded SPSC ring buffer stored in leased fabric memory.
2//!
3//! [`FabricQueue<T>`] implements a single-producer single-consumer FIFO queue
4//! backed by a leased memory region. Elements are serialized with postcard
5//! into fixed-size slots arranged in a circular buffer.
6//!
7//! # Memory layout
8//!
9//! ```text
10//! Offset 0: [header] head: u64 (8) | tail: u64 (8) | capacity: u64 (8) | stride: u64 (8)
11//! Offset 32: [slots] slot 0 (stride bytes) | slot 1 | ... | slot (capacity-1)
12//! ```
13//!
14//! `head` is the next index to read from, `tail` is the next index to write
15//! to. The queue is empty when `head == tail` and full when
16//! `(tail + 1) % capacity == head`. One slot is always reserved for the
17//! empty/full distinction, so the usable capacity is `capacity - 1`.
18//!
19//! # Example
20//!
21//! ```rust
22//! use grafos_collections::queue::FabricQueue;
23//! use grafos_std::mem::MemBuilder;
24//!
25//! # grafos_std::host::reset_mock();
26//! # grafos_std::host::mock_set_fbmu_arena_size(65536);
27//! let lease = MemBuilder::new().min_bytes(4096).acquire()?;
28//! let mut q: FabricQueue<u32> = FabricQueue::new(lease, 16, 16)?;
29//!
30//! q.push(&1)?;
31//! q.push(&2)?;
32//! assert_eq!(q.pop()?, Some(1)); // FIFO order
33//! # Ok::<(), grafos_std::FabricError>(())
34//! ```
35
36extern crate alloc;
37use alloc::vec;
38
39use grafos_std::error::{FabricError, Result};
40use grafos_std::mem::{MemBuilder, MemLease};
41
42use serde::{de::DeserializeOwned, Serialize};
43
44const HEADER_SIZE: u64 = 32;
45
46/// A bounded SPSC ring buffer stored in leased fabric memory.
47///
48/// The queue owns its [`MemLease`] and releases it on drop. Elements are
49/// stored in FIFO order: `push()` appends to the tail, `pop()` reads from
50/// the head.
51///
52/// # Non-blocking semantics
53///
54/// - `push()` returns `Ok(false)` when the queue is full.
55/// - `pop()` returns `Ok(None)` when the queue is empty.
56///
57/// Neither operation blocks or retries.
58///
59/// # Future direction: MPMC
60///
61/// This queue is currently single-producer single-consumer. A future MPMC
62/// variant would replace the plain `head`/`tail` fields with
63/// compare-and-swap (CAS) atomic operations on the remote memory, or
64/// alternatively use a lock-based protocol with a lease-scoped mutex in
65/// the header region. MPMC support is not yet implemented.
66///
67/// # Example
68///
69/// ```rust
70/// use grafos_collections::queue::FabricQueue;
71///
72/// # grafos_std::host::reset_mock();
73/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
74/// let mut q: FabricQueue<u32> = FabricQueue::with_capacity(8, 16)?;
75/// q.push(&10)?;
76/// q.push(&20)?;
77/// assert_eq!(q.pop()?, Some(10));
78/// assert_eq!(q.pop()?, Some(20));
79/// assert_eq!(q.pop()?, None);
80/// # Ok::<(), grafos_std::FabricError>(())
81/// ```
82pub struct FabricQueue<T> {
83 lease: MemLease,
84 head: u64,
85 tail: u64,
86 capacity: u64,
87 stride: u64,
88 _marker: core::marker::PhantomData<T>,
89}
90
91impl<T: Serialize + DeserializeOwned> FabricQueue<T> {
92 /// Create a new queue over the given lease.
93 ///
94 /// `capacity` is the total number of slots. `stride` is the slot width
95 /// in bytes (must accommodate 4-byte length prefix plus serialized
96 /// element). One slot is reserved for the empty/full distinction, so
97 /// the usable capacity is `capacity - 1`.
98 ///
99 /// # Errors
100 ///
101 /// Returns [`FabricError::CapacityExceeded`] if the arena is too small
102 /// to hold the header (32 bytes) plus `capacity * stride` bytes.
103 pub fn new(lease: MemLease, capacity: usize, stride: usize) -> Result<Self> {
104 let arena = lease.mem().arena_size()?;
105 let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
106 if arena < needed {
107 return Err(FabricError::CapacityExceeded);
108 }
109 let q = FabricQueue {
110 lease,
111 head: 0,
112 tail: 0,
113 capacity: capacity as u64,
114 stride: stride as u64,
115 _marker: core::marker::PhantomData,
116 };
117 q.write_header()?;
118 Ok(q)
119 }
120
121 /// Create a new queue by acquiring a lease sized for `capacity` elements
122 /// of `stride` bytes each.
123 ///
124 /// Convenience constructor that acquires a lease via `MemBuilder` and
125 /// then calls [`FabricQueue::new`].
126 ///
127 /// # Errors
128 ///
129 /// Returns [`FabricError::CapacityExceeded`] if the host cannot provide
130 /// a large enough arena.
131 pub fn with_capacity(capacity: usize, stride: usize) -> Result<Self> {
132 let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
133 let lease = MemBuilder::new().min_bytes(needed).acquire()?;
134 Self::new(lease, capacity, stride)
135 }
136
137 /// Push an element onto the back of the queue.
138 ///
139 /// Returns `Ok(true)` if the element was enqueued, `Ok(false)` if the
140 /// queue is full. Does not block.
141 ///
142 /// # Errors
143 ///
144 /// - [`FabricError::CapacityExceeded`] if the serialized element
145 /// (plus 4-byte length prefix) exceeds `stride`.
146 /// - [`FabricError::IoError`] if serialization or remote write fails.
147 pub fn push(&mut self, item: &T) -> Result<bool> {
148 if self.is_full() {
149 return Ok(false);
150 }
151 let bytes = postcard::to_allocvec(item).map_err(|_| FabricError::IoError(-1))?;
152 if bytes.len() as u64 + 4 > self.stride {
153 return Err(FabricError::CapacityExceeded);
154 }
155 let offset = self.slot_offset(self.tail);
156 let mut slot = vec![0u8; self.stride as usize];
157 let len_bytes = (bytes.len() as u32).to_le_bytes();
158 slot[..4].copy_from_slice(&len_bytes);
159 slot[4..4 + bytes.len()].copy_from_slice(&bytes);
160 self.lease.mem().write(offset, &slot)?;
161 self.tail = (self.tail + 1) % self.capacity;
162 self.write_header()?;
163 Ok(true)
164 }
165
166 /// Pop an element from the front of the queue.
167 ///
168 /// Returns `Ok(Some(item))` if an element was dequeued, `Ok(None)` if
169 /// the queue is empty. Does not block.
170 ///
171 /// # Errors
172 ///
173 /// Returns [`FabricError::IoError`] if the remote read or
174 /// deserialization fails.
175 pub fn pop(&mut self) -> Result<Option<T>> {
176 if self.is_empty() {
177 return Ok(None);
178 }
179 let offset = self.slot_offset(self.head);
180 let raw = self.lease.mem().read(offset, self.stride as u32)?;
181 let ser_len = u32::from_le_bytes([raw[0], raw[1], raw[2], raw[3]]) as usize;
182 let item: T =
183 postcard::from_bytes(&raw[4..4 + ser_len]).map_err(|_| FabricError::IoError(-1))?;
184 self.head = (self.head + 1) % self.capacity;
185 self.write_header()?;
186 Ok(Some(item))
187 }
188
189 /// Returns the number of elements currently in the queue.
190 pub fn len(&self) -> usize {
191 if self.tail >= self.head {
192 (self.tail - self.head) as usize
193 } else {
194 (self.capacity - self.head + self.tail) as usize
195 }
196 }
197
198 /// Returns `true` if the queue contains no elements.
199 pub fn is_empty(&self) -> bool {
200 self.head == self.tail
201 }
202
203 /// Returns `true` if the queue is full.
204 pub fn is_full(&self) -> bool {
205 (self.tail + 1) % self.capacity == self.head
206 }
207
208 /// Returns the lease ID of the memory lease for external renewal
209 /// management (e.g. via [`grafos_leasekit::RenewalManager`]).
210 pub fn lease_id(&self) -> u128 {
211 self.lease.lease_id()
212 }
213
214 /// Returns the expiry time (unix seconds) of the memory lease for
215 /// external renewal management.
216 pub fn expires_at_unix_secs(&self) -> u64 {
217 self.lease.expires_at_unix_secs()
218 }
219
220 /// Returns the maximum number of elements the queue can hold.
221 ///
222 /// This is `capacity - 1` since one slot is reserved for the
223 /// empty/full distinction.
224 pub fn max_len(&self) -> usize {
225 (self.capacity - 1) as usize
226 }
227
228 fn slot_offset(&self, index: u64) -> u64 {
229 HEADER_SIZE + index * self.stride
230 }
231
232 fn write_header(&self) -> Result<()> {
233 let mut hdr = [0u8; HEADER_SIZE as usize];
234 hdr[0..8].copy_from_slice(&self.head.to_le_bytes());
235 hdr[8..16].copy_from_slice(&self.tail.to_le_bytes());
236 hdr[16..24].copy_from_slice(&self.capacity.to_le_bytes());
237 hdr[24..32].copy_from_slice(&self.stride.to_le_bytes());
238 self.lease.mem().write(0, &hdr)
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use grafos_std::host;
246 use grafos_std::mem::MemBuilder;
247
248 fn setup(arena_size: u64) -> MemLease {
249 host::reset_mock();
250 host::mock_set_fbmu_arena_size(arena_size);
251 MemBuilder::new().acquire().expect("acquire")
252 }
253
254 #[test]
255 fn push_pop_fifo() {
256 let lease = setup(4096);
257 let mut q: FabricQueue<u32> = FabricQueue::new(lease, 16, 16).expect("new");
258
259 assert!(q.is_empty());
260 assert!(!q.is_full());
261 assert_eq!(q.len(), 0);
262
263 q.push(&1).expect("push");
264 q.push(&2).expect("push");
265 q.push(&3).expect("push");
266 assert_eq!(q.len(), 3);
267
268 assert_eq!(q.pop().expect("pop"), Some(1));
269 assert_eq!(q.pop().expect("pop"), Some(2));
270 assert_eq!(q.pop().expect("pop"), Some(3));
271 assert_eq!(q.pop().expect("pop"), None);
272 }
273
274 #[test]
275 fn full_returns_false() {
276 // capacity=4, stride=16, so 3 usable slots
277 let lease = setup(HEADER_SIZE + 4 * 16);
278 let mut q: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("new");
279
280 assert!(q.push(&1).expect("push 1"));
281 assert!(q.push(&2).expect("push 2"));
282 assert!(q.push(&3).expect("push 3"));
283 assert!(q.is_full());
284 assert!(!q.push(&4).expect("push 4 (full)"));
285 }
286
287 #[test]
288 fn wraparound() {
289 let lease = setup(HEADER_SIZE + 4 * 16);
290 let mut q: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("new");
291
292 // Fill and drain a few times to exercise wrap-around
293 for round in 0..3u32 {
294 let base = round * 100;
295 q.push(&(base + 1)).expect("push");
296 q.push(&(base + 2)).expect("push");
297 q.push(&(base + 3)).expect("push");
298 assert!(q.is_full());
299
300 assert_eq!(q.pop().expect("pop"), Some(base + 1));
301 assert_eq!(q.pop().expect("pop"), Some(base + 2));
302 assert_eq!(q.pop().expect("pop"), Some(base + 3));
303 assert!(q.is_empty());
304 }
305 }
306
307 #[test]
308 fn with_capacity_works() {
309 host::reset_mock();
310 host::mock_set_fbmu_arena_size(65536);
311 let q: FabricQueue<u64> = FabricQueue::with_capacity(32, 16).expect("with_capacity");
312 assert_eq!(q.max_len(), 31);
313 }
314
315 #[test]
316 fn interleaved_push_pop() {
317 let lease = setup(4096);
318 let mut q: FabricQueue<u32> = FabricQueue::new(lease, 8, 16).expect("new");
319
320 q.push(&10).expect("push");
321 q.push(&20).expect("push");
322 assert_eq!(q.pop().expect("pop"), Some(10));
323 q.push(&30).expect("push");
324 assert_eq!(q.pop().expect("pop"), Some(20));
325 assert_eq!(q.pop().expect("pop"), Some(30));
326 assert_eq!(q.pop().expect("pop"), None);
327 }
328}