grafos_sync/barrier.rs
1//! Distributed barrier for multi-party phase synchronization.
2//!
3//! A [`FabricBarrier`] blocks each participant at a synchronization point
4//! until all `num_parties` have arrived. On the last arrival, the generation
5//! counter advances and the arrival count resets, making the barrier reusable
6//! across successive phases.
7//!
8//! # Memory layout at `base_offset`
9//!
10//! ```text
11//! offset +0: generation (u64, 8 bytes) — incremented when all parties arrive
12//! offset +8: count (u64, 8 bytes) — number of parties that have called wait()
13//! offset +16: num_parties (u32, 4 bytes) — total parties expected
14//! ```
15//!
16//! Total header: 20 bytes.
17
18use grafos_std::error::{FabricError, Result};
19use grafos_std::mem::MemLease;
20
21const GENERATION_OFFSET: u64 = 0;
22const COUNT_OFFSET: u64 = 8;
23const NUM_PARTIES_OFFSET: u64 = 16;
24
25/// Distributed barrier backed by a fabric memory lease.
26///
27/// All `num_parties` participants must call [`wait`](FabricBarrier::wait)
28/// before any of them proceed. After all parties arrive, the generation
29/// counter increments and the barrier resets for reuse across successive
30/// synchronization rounds.
31///
32/// If a party crashes, the lease eventually expires and remaining parties
33/// will observe a timeout ([`FabricError::LeaseExpired`]).
34///
35/// # Example
36///
37/// ```rust
38/// # grafos_std::host::reset_mock();
39/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
40/// use grafos_sync::FabricBarrier;
41/// use grafos_std::mem::MemBuilder;
42///
43/// let lease = MemBuilder::new().acquire().unwrap();
44/// let barrier = FabricBarrier::new(lease, 0, 1).unwrap();
45///
46/// // With 1 party, wait() completes immediately
47/// let gen = barrier.wait(10).unwrap();
48/// assert_eq!(gen, 1);
49/// ```
50pub struct FabricBarrier {
51 lease: MemLease,
52 base_offset: u64,
53 num_parties: u32,
54}
55
56fn read_u64(mem: &grafos_std::mem::FabricMem, offset: u64) -> Result<u64> {
57 let data = mem.read(offset, 8)?;
58 if data.len() < 8 {
59 return Ok(0);
60 }
61 let mut buf = [0u8; 8];
62 buf.copy_from_slice(&data[..8]);
63 Ok(u64::from_le_bytes(buf))
64}
65
66fn write_u64(mem: &grafos_std::mem::FabricMem, offset: u64, val: u64) -> Result<()> {
67 mem.write(offset, &val.to_le_bytes())
68}
69
70fn write_u32(mem: &grafos_std::mem::FabricMem, offset: u64, val: u32) -> Result<()> {
71 mem.write(offset, &val.to_le_bytes())
72}
73
74impl FabricBarrier {
75 /// Create a new barrier for `num_parties` participants.
76 ///
77 /// Initializes the barrier state in leased memory at `base_offset`.
78 pub fn new(lease: MemLease, base_offset: u64, num_parties: u32) -> Result<Self> {
79 let mem = lease.mem();
80 write_u64(mem, base_offset + GENERATION_OFFSET, 0)?;
81 write_u64(mem, base_offset + COUNT_OFFSET, 0)?;
82 write_u32(mem, base_offset + NUM_PARTIES_OFFSET, num_parties)?;
83
84 Ok(FabricBarrier {
85 lease,
86 base_offset,
87 num_parties,
88 })
89 }
90
91 /// Wait at the barrier.
92 ///
93 /// Increments the arrival count. If this is the last party to arrive,
94 /// the generation is incremented and the count is reset to zero. Otherwise,
95 /// polls until the generation changes, indicating all parties have arrived.
96 ///
97 /// Returns the new generation number on success.
98 ///
99 /// `max_polls` limits how many polling iterations before returning
100 /// [`FabricError::LeaseExpired`].
101 pub fn wait(&self, max_polls: u32) -> Result<u64> {
102 let mem = self.lease.mem();
103 let base = self.base_offset;
104
105 let gen_before = read_u64(mem, base + GENERATION_OFFSET)?;
106 let count = read_u64(mem, base + COUNT_OFFSET)?;
107 let new_count = count + 1;
108
109 if new_count >= self.num_parties as u64 {
110 // We are the last party — advance generation, reset count
111 write_u64(mem, base + GENERATION_OFFSET, gen_before + 1)?;
112 write_u64(mem, base + COUNT_OFFSET, 0)?;
113 return Ok(gen_before + 1);
114 }
115
116 // Not the last — write our arrival and wait for generation to change
117 write_u64(mem, base + COUNT_OFFSET, new_count)?;
118
119 for _ in 0..max_polls {
120 let gen_now = read_u64(mem, base + GENERATION_OFFSET)?;
121 if gen_now != gen_before {
122 return Ok(gen_now);
123 }
124 }
125
126 Err(FabricError::LeaseExpired)
127 }
128
129 /// Read the current generation counter.
130 ///
131 /// The generation starts at 0 and increments by one each time all
132 /// `num_parties` have arrived and the barrier completes a round.
133 pub fn generation(&self) -> Result<u64> {
134 read_u64(self.lease.mem(), self.base_offset + GENERATION_OFFSET)
135 }
136
137 /// Returns the lease ID of the underlying memory lease for external
138 /// renewal management (e.g. via [`grafos_leasekit::RenewalManager`]).
139 pub fn lease_id(&self) -> u128 {
140 self.lease.lease_id()
141 }
142
143 /// Returns the expiry time (unix seconds) of the underlying memory
144 /// lease for external renewal management.
145 pub fn expires_at_unix_secs(&self) -> u64 {
146 self.lease.expires_at_unix_secs()
147 }
148
149 /// Read the current arrival count.
150 ///
151 /// Returns the number of parties that have called [`wait`](Self::wait) in
152 /// the current generation. Resets to 0 when the last party arrives and the
153 /// generation advances.
154 pub fn count(&self) -> Result<u64> {
155 read_u64(self.lease.mem(), self.base_offset + COUNT_OFFSET)
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use grafos_std::host;
163 use grafos_std::mem::MemBuilder;
164
165 fn setup() -> MemLease {
166 host::reset_mock();
167 host::mock_set_fbmu_arena_size(4096);
168 MemBuilder::new().acquire().expect("acquire lease")
169 }
170
171 #[test]
172 fn barrier_three_parties_sequential() {
173 let lease = setup();
174 let barrier = FabricBarrier::new(lease, 0, 3).expect("new barrier");
175
176 assert_eq!(barrier.generation().unwrap(), 0);
177 assert_eq!(barrier.count().unwrap(), 0);
178
179 // Party 1 arrives — count goes to 1, waits for generation change
180 // Since we're sequential, party 1 would block. Instead, we simulate
181 // by directly calling wait for all three in sequence.
182 // Party 1: count 0 → 1, not last, starts polling
183 // But since we're single-threaded, we need to simulate this differently.
184 //
185 // In the sequential mock, we call wait() three times. The first two
186 // increment the count, and the third sees count == num_parties and
187 // advances the generation. The first two will poll and see the
188 // generation has changed (because the third call already ran).
189 //
190 // To make this work in single-threaded tests, we simulate the
191 // arrivals manually and then call wait for the last party.
192
193 // Manually set count to 2 (simulating 2 parties already arrived)
194 let mem = barrier.lease.mem();
195 write_u64(mem, COUNT_OFFSET, 2).unwrap();
196
197 // Third party calls wait — should complete immediately
198 let gen = barrier.wait(10).expect("wait");
199 assert_eq!(gen, 1);
200 assert_eq!(barrier.generation().unwrap(), 1);
201 assert_eq!(barrier.count().unwrap(), 0); // reset after all arrive
202 }
203
204 #[test]
205 fn barrier_reuse_across_generations() {
206 let lease = setup();
207 let barrier = FabricBarrier::new(lease, 0, 2).expect("new barrier");
208
209 // Generation 0: simulate first party arrived
210 let mem = barrier.lease.mem();
211 write_u64(mem, COUNT_OFFSET, 1).unwrap();
212
213 // Second party arrives — completes generation 0
214 let gen = barrier.wait(10).expect("wait gen 0");
215 assert_eq!(gen, 1);
216 assert_eq!(barrier.count().unwrap(), 0);
217
218 // Generation 1: simulate first party arrived again
219 write_u64(mem, barrier.base_offset + COUNT_OFFSET, 1).unwrap();
220
221 // Second party arrives — completes generation 1
222 let gen = barrier.wait(10).expect("wait gen 1");
223 assert_eq!(gen, 2);
224 assert_eq!(barrier.generation().unwrap(), 2);
225 }
226
227 #[test]
228 fn barrier_timeout_when_not_all_arrive() {
229 let lease = setup();
230 let barrier = FabricBarrier::new(lease, 0, 3).expect("new barrier");
231
232 // Only 1 party arrives (count stays at 0 before wait increments to 1)
233 // wait will poll but generation never changes
234 let result = barrier.wait(5);
235 assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
236 }
237}