grafos_collections/queue.rs
1//! A bounded SPSC ring buffer stored in leased fabric memory.
2//!
3//! [`FabricQueue<T>`] implements a single-producer single-consumer FIFO queue
4//! backed by a leased memory region. Elements are serialized with postcard
5//! into fixed-size slots arranged in a circular buffer.
6//!
7//! # Memory layout
8//!
9//! ```text
10//! Offset 0: [header] head: u64 (8) | tail: u64 (8) | capacity: u64 (8) | stride: u64 (8)
11//! Offset 32: [slots] slot 0 (stride bytes) | slot 1 | ... | slot (capacity-1)
12//! ```
13//!
14//! `head` is the next index to read from, `tail` is the next index to write
15//! to. The queue is empty when `head == tail` and full when
16//! `(tail + 1) % capacity == head`. One slot is always reserved for the
17//! empty/full distinction, so the usable capacity is `capacity - 1`.
18//!
19//! # Example
20//!
21//! ```rust
22//! use grafos_collections::queue::FabricQueue;
23//! use grafos_std::mem::MemBuilder;
24//!
25//! # grafos_std::host::reset_mock();
26//! # grafos_std::host::mock_set_fbmu_arena_size(65536);
27//! let lease = MemBuilder::new().min_bytes(4096).acquire()?;
28//! let mut q: FabricQueue<u32> = FabricQueue::new(lease, 16, 16)?;
29//!
30//! q.push(&1)?;
31//! q.push(&2)?;
32//! assert_eq!(q.pop()?, Some(1)); // FIFO order
33//! # Ok::<(), grafos_std::FabricError>(())
34//! ```
35
36extern crate alloc;
37use alloc::vec;
38
39use grafos_std::error::{FabricError, Result};
40use grafos_std::mem::{MemBuilder, MemLease};
41
42use serde::{de::DeserializeOwned, Serialize};
43
44const HEADER_SIZE: u64 = 32;
45
46/// A bounded SPSC ring buffer stored in leased fabric memory.
47///
48/// The queue owns its [`MemLease`] and releases it on drop. Elements are
49/// stored in FIFO order: `push()` appends to the tail, `pop()` reads from
50/// the head.
51///
52/// `FabricQueue` is lease-scoped. It does not provide placement, quorum,
53/// replica-set membership, or cross-domain recovery by itself; use
54/// `grafos_replicated` for a logical replicated queue.
55///
56/// # Non-blocking semantics
57///
58/// - `push()` returns `Ok(false)` when the queue is full.
59/// - `pop()` returns `Ok(None)` when the queue is empty.
60///
61/// Neither operation blocks or retries.
62///
63/// # Future direction: MPMC
64///
65/// This queue is currently single-producer single-consumer. A future MPMC
66/// variant would replace the plain `head`/`tail` fields with
67/// compare-and-swap (CAS) atomic operations on the remote memory, or
68/// alternatively use a lock-based protocol with a lease-scoped mutex in
69/// the header region. MPMC support is not yet implemented.
70///
71/// # Example
72///
73/// ```rust
74/// use grafos_collections::queue::FabricQueue;
75///
76/// # grafos_std::host::reset_mock();
77/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
78/// let mut q: FabricQueue<u32> = FabricQueue::with_capacity(8, 16)?;
79/// q.push(&10)?;
80/// q.push(&20)?;
81/// assert_eq!(q.pop()?, Some(10));
82/// assert_eq!(q.pop()?, Some(20));
83/// assert_eq!(q.pop()?, None);
84/// # Ok::<(), grafos_std::FabricError>(())
85/// ```
86pub struct FabricQueue<T> {
87 lease: MemLease,
88 head: u64,
89 tail: u64,
90 capacity: u64,
91 stride: u64,
92 _marker: core::marker::PhantomData<T>,
93}
94
95impl<T: Serialize + DeserializeOwned> FabricQueue<T> {
96 /// Create a new queue over the given lease.
97 ///
98 /// `capacity` is the total number of slots. `stride` is the slot width
99 /// in bytes (must accommodate 4-byte length prefix plus serialized
100 /// element). One slot is reserved for the empty/full distinction, so
101 /// the usable capacity is `capacity - 1`.
102 ///
103 /// # Errors
104 ///
105 /// Returns [`FabricError::CapacityExceeded`] if the arena is too small
106 /// to hold the header (32 bytes) plus `capacity * stride` bytes.
107 pub fn new(lease: MemLease, capacity: usize, stride: usize) -> Result<Self> {
108 let arena = lease.mem().arena_size()?;
109 let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
110 if arena < needed {
111 return Err(FabricError::CapacityExceeded);
112 }
113 let q = FabricQueue {
114 lease,
115 head: 0,
116 tail: 0,
117 capacity: capacity as u64,
118 stride: stride as u64,
119 _marker: core::marker::PhantomData,
120 };
121 q.write_header()?;
122 Ok(q)
123 }
124
125 /// Create a new queue by acquiring a lease sized for `capacity` elements
126 /// of `stride` bytes each.
127 ///
128 /// Convenience constructor that acquires a lease via `MemBuilder` and
129 /// then calls [`FabricQueue::new`].
130 ///
131 /// # Errors
132 ///
133 /// Returns [`FabricError::CapacityExceeded`] if the host cannot provide
134 /// a large enough arena.
135 pub fn with_capacity(capacity: usize, stride: usize) -> Result<Self> {
136 let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
137 let lease = MemBuilder::new().min_bytes(needed).acquire()?;
138 Self::new(lease, capacity, stride)
139 }
140
141 /// Push an element onto the back of the queue.
142 ///
143 /// Returns `Ok(true)` if the element was enqueued, `Ok(false)` if the
144 /// queue is full. Does not block.
145 ///
146 /// # Errors
147 ///
148 /// - [`FabricError::CapacityExceeded`] if the serialized element
149 /// (plus 4-byte length prefix) exceeds `stride`.
150 /// - [`FabricError::IoError`] if serialization or remote write fails.
151 pub fn push(&mut self, item: &T) -> Result<bool> {
152 if self.is_full() {
153 return Ok(false);
154 }
155 let bytes = postcard::to_allocvec(item).map_err(|_| FabricError::IoError(-1))?;
156 if bytes.len() as u64 + 4 > self.stride {
157 return Err(FabricError::CapacityExceeded);
158 }
159 let offset = self.slot_offset(self.tail);
160 let mut slot = vec![0u8; self.stride as usize];
161 let len_bytes = (bytes.len() as u32).to_le_bytes();
162 slot[..4].copy_from_slice(&len_bytes);
163 slot[4..4 + bytes.len()].copy_from_slice(&bytes);
164 self.lease.mem().write(offset, &slot)?;
165 self.tail = (self.tail + 1) % self.capacity;
166 self.write_header()?;
167 Ok(true)
168 }
169
170 /// Pop an element from the front of the queue.
171 ///
172 /// Returns `Ok(Some(item))` if an element was dequeued, `Ok(None)` if
173 /// the queue is empty. Does not block.
174 ///
175 /// # Errors
176 ///
177 /// Returns [`FabricError::IoError`] if the remote read or
178 /// deserialization fails.
179 pub fn pop(&mut self) -> Result<Option<T>> {
180 if self.is_empty() {
181 return Ok(None);
182 }
183 let offset = self.slot_offset(self.head);
184 let raw = self.lease.mem().read(offset, self.stride as u32)?;
185 let ser_len = u32::from_le_bytes([raw[0], raw[1], raw[2], raw[3]]) as usize;
186 let item: T =
187 postcard::from_bytes(&raw[4..4 + ser_len]).map_err(|_| FabricError::IoError(-1))?;
188 self.head = (self.head + 1) % self.capacity;
189 self.write_header()?;
190 Ok(Some(item))
191 }
192
193 /// Returns the number of elements currently in the queue.
194 pub fn len(&self) -> usize {
195 if self.tail >= self.head {
196 (self.tail - self.head) as usize
197 } else {
198 (self.capacity - self.head + self.tail) as usize
199 }
200 }
201
202 /// Returns `true` if the queue contains no elements.
203 pub fn is_empty(&self) -> bool {
204 self.head == self.tail
205 }
206
207 /// Returns `true` if the queue is full.
208 pub fn is_full(&self) -> bool {
209 (self.tail + 1) % self.capacity == self.head
210 }
211
212 /// Returns the lease ID of the memory lease for external renewal
213 /// management (e.g. via [`grafos_leasekit::RenewalManager`]).
214 pub fn lease_id(&self) -> u128 {
215 self.lease.lease_id()
216 }
217
218 /// Returns the expiry time (unix seconds) of the memory lease for
219 /// external renewal management.
220 pub fn expires_at_unix_secs(&self) -> u64 {
221 self.lease.expires_at_unix_secs()
222 }
223
224 /// Returns the maximum number of elements the queue can hold.
225 ///
226 /// This is `capacity - 1` since one slot is reserved for the
227 /// empty/full distinction.
228 pub fn max_len(&self) -> usize {
229 (self.capacity - 1) as usize
230 }
231
232 fn slot_offset(&self, index: u64) -> u64 {
233 HEADER_SIZE + index * self.stride
234 }
235
236 fn write_header(&self) -> Result<()> {
237 let mut hdr = [0u8; HEADER_SIZE as usize];
238 hdr[0..8].copy_from_slice(&self.head.to_le_bytes());
239 hdr[8..16].copy_from_slice(&self.tail.to_le_bytes());
240 hdr[16..24].copy_from_slice(&self.capacity.to_le_bytes());
241 hdr[24..32].copy_from_slice(&self.stride.to_le_bytes());
242 self.lease.mem().write(0, &hdr)
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use grafos_std::host;
250 use grafos_std::mem::MemBuilder;
251
252 fn setup(arena_size: u64) -> MemLease {
253 host::reset_mock();
254 host::mock_set_fbmu_arena_size(arena_size);
255 MemBuilder::new().acquire().expect("acquire")
256 }
257
258 #[test]
259 fn push_pop_fifo() {
260 let lease = setup(4096);
261 let mut q: FabricQueue<u32> = FabricQueue::new(lease, 16, 16).expect("new");
262
263 assert!(q.is_empty());
264 assert!(!q.is_full());
265 assert_eq!(q.len(), 0);
266
267 q.push(&1).expect("push");
268 q.push(&2).expect("push");
269 q.push(&3).expect("push");
270 assert_eq!(q.len(), 3);
271
272 assert_eq!(q.pop().expect("pop"), Some(1));
273 assert_eq!(q.pop().expect("pop"), Some(2));
274 assert_eq!(q.pop().expect("pop"), Some(3));
275 assert_eq!(q.pop().expect("pop"), None);
276 }
277
278 #[test]
279 fn full_returns_false() {
280 // capacity=4, stride=16, so 3 usable slots
281 let lease = setup(HEADER_SIZE + 4 * 16);
282 let mut q: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("new");
283
284 assert!(q.push(&1).expect("push 1"));
285 assert!(q.push(&2).expect("push 2"));
286 assert!(q.push(&3).expect("push 3"));
287 assert!(q.is_full());
288 assert!(!q.push(&4).expect("push 4 (full)"));
289 }
290
291 #[test]
292 fn wraparound() {
293 let lease = setup(HEADER_SIZE + 4 * 16);
294 let mut q: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("new");
295
296 // Fill and drain a few times to exercise wrap-around
297 for round in 0..3u32 {
298 let base = round * 100;
299 q.push(&(base + 1)).expect("push");
300 q.push(&(base + 2)).expect("push");
301 q.push(&(base + 3)).expect("push");
302 assert!(q.is_full());
303
304 assert_eq!(q.pop().expect("pop"), Some(base + 1));
305 assert_eq!(q.pop().expect("pop"), Some(base + 2));
306 assert_eq!(q.pop().expect("pop"), Some(base + 3));
307 assert!(q.is_empty());
308 }
309 }
310
311 #[test]
312 fn with_capacity_works() {
313 host::reset_mock();
314 host::mock_set_fbmu_arena_size(65536);
315 let q: FabricQueue<u64> = FabricQueue::with_capacity(32, 16).expect("with_capacity");
316 assert_eq!(q.max_len(), 31);
317 }
318
319 #[test]
320 fn interleaved_push_pop() {
321 let lease = setup(4096);
322 let mut q: FabricQueue<u32> = FabricQueue::new(lease, 8, 16).expect("new");
323
324 q.push(&10).expect("push");
325 q.push(&20).expect("push");
326 assert_eq!(q.pop().expect("pop"), Some(10));
327 q.push(&30).expect("push");
328 assert_eq!(q.pop().expect("pop"), Some(20));
329 assert_eq!(q.pop().expect("pop"), Some(30));
330 assert_eq!(q.pop().expect("pop"), None);
331 }
332}