grafos_sync/
barrier.rs

1//! Distributed barrier for multi-party phase synchronization.
2//!
3//! A [`FabricBarrier`] blocks each participant at a synchronization point
4//! until all `num_parties` have arrived. On the last arrival, the generation
5//! counter advances and the arrival count resets, making the barrier reusable
6//! across successive phases.
7//!
8//! # Memory layout at `base_offset`
9//!
10//! ```text
11//! offset +0:  generation   (u64, 8 bytes)  — incremented when all parties arrive
12//! offset +8:  count        (u64, 8 bytes)  — number of parties that have called wait()
13//! offset +16: num_parties  (u32, 4 bytes)  — total parties expected
14//! ```
15//!
16//! Total header: 20 bytes.
17
18use grafos_std::error::{FabricError, Result};
19use grafos_std::mem::MemLease;
20
21const GENERATION_OFFSET: u64 = 0;
22const COUNT_OFFSET: u64 = 8;
23const NUM_PARTIES_OFFSET: u64 = 16;
24
25/// Distributed barrier backed by a fabric memory lease.
26///
27/// All `num_parties` participants must call [`wait`](FabricBarrier::wait)
28/// before any of them proceed. After all parties arrive, the generation
29/// counter increments and the barrier resets for reuse across successive
30/// synchronization rounds.
31///
32/// If a party crashes, the lease eventually expires and remaining parties
33/// will observe a timeout ([`FabricError::LeaseExpired`]).
34///
35/// # Example
36///
37/// ```rust
38/// # grafos_std::host::reset_mock();
39/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
40/// use grafos_sync::FabricBarrier;
41/// use grafos_std::mem::MemBuilder;
42///
43/// let lease = MemBuilder::new().acquire().unwrap();
44/// let barrier = FabricBarrier::new(lease, 0, 1).unwrap();
45///
46/// // With 1 party, wait() completes immediately
47/// let gen = barrier.wait(10).unwrap();
48/// assert_eq!(gen, 1);
49/// ```
50pub struct FabricBarrier {
51    lease: MemLease,
52    base_offset: u64,
53    num_parties: u32,
54}
55
56fn read_u64(mem: &grafos_std::mem::FabricMem, offset: u64) -> Result<u64> {
57    let data = mem.read(offset, 8)?;
58    if data.len() < 8 {
59        return Ok(0);
60    }
61    let mut buf = [0u8; 8];
62    buf.copy_from_slice(&data[..8]);
63    Ok(u64::from_le_bytes(buf))
64}
65
66fn write_u64(mem: &grafos_std::mem::FabricMem, offset: u64, val: u64) -> Result<()> {
67    mem.write(offset, &val.to_le_bytes())
68}
69
70fn write_u32(mem: &grafos_std::mem::FabricMem, offset: u64, val: u32) -> Result<()> {
71    mem.write(offset, &val.to_le_bytes())
72}
73
74impl FabricBarrier {
75    /// Create a new barrier for `num_parties` participants.
76    ///
77    /// Initializes the barrier state in leased memory at `base_offset`.
78    pub fn new(lease: MemLease, base_offset: u64, num_parties: u32) -> Result<Self> {
79        let mem = lease.mem();
80        write_u64(mem, base_offset + GENERATION_OFFSET, 0)?;
81        write_u64(mem, base_offset + COUNT_OFFSET, 0)?;
82        write_u32(mem, base_offset + NUM_PARTIES_OFFSET, num_parties)?;
83
84        Ok(FabricBarrier {
85            lease,
86            base_offset,
87            num_parties,
88        })
89    }
90
91    /// Wait at the barrier.
92    ///
93    /// Increments the arrival count. If this is the last party to arrive,
94    /// the generation is incremented and the count is reset to zero. Otherwise,
95    /// polls until the generation changes, indicating all parties have arrived.
96    ///
97    /// Returns the new generation number on success.
98    ///
99    /// `max_polls` limits how many polling iterations before returning
100    /// [`FabricError::LeaseExpired`].
101    pub fn wait(&self, max_polls: u32) -> Result<u64> {
102        let mem = self.lease.mem();
103        let base = self.base_offset;
104
105        let gen_before = read_u64(mem, base + GENERATION_OFFSET)?;
106        let count = read_u64(mem, base + COUNT_OFFSET)?;
107        let new_count = count + 1;
108
109        if new_count >= self.num_parties as u64 {
110            // We are the last party — advance generation, reset count
111            write_u64(mem, base + GENERATION_OFFSET, gen_before + 1)?;
112            write_u64(mem, base + COUNT_OFFSET, 0)?;
113            return Ok(gen_before + 1);
114        }
115
116        // Not the last — write our arrival and wait for generation to change
117        write_u64(mem, base + COUNT_OFFSET, new_count)?;
118
119        for _ in 0..max_polls {
120            let gen_now = read_u64(mem, base + GENERATION_OFFSET)?;
121            if gen_now != gen_before {
122                return Ok(gen_now);
123            }
124        }
125
126        Err(FabricError::LeaseExpired)
127    }
128
129    /// Read the current generation counter.
130    ///
131    /// The generation starts at 0 and increments by one each time all
132    /// `num_parties` have arrived and the barrier completes a round.
133    pub fn generation(&self) -> Result<u64> {
134        read_u64(self.lease.mem(), self.base_offset + GENERATION_OFFSET)
135    }
136
137    /// Returns the lease ID of the underlying memory lease for external
138    /// renewal management (e.g. via [`grafos_leasekit::RenewalManager`]).
139    pub fn lease_id(&self) -> u128 {
140        self.lease.lease_id()
141    }
142
143    /// Returns the expiry time (unix seconds) of the underlying memory
144    /// lease for external renewal management.
145    pub fn expires_at_unix_secs(&self) -> u64 {
146        self.lease.expires_at_unix_secs()
147    }
148
149    /// Read the current arrival count.
150    ///
151    /// Returns the number of parties that have called [`wait`](Self::wait) in
152    /// the current generation. Resets to 0 when the last party arrives and the
153    /// generation advances.
154    pub fn count(&self) -> Result<u64> {
155        read_u64(self.lease.mem(), self.base_offset + COUNT_OFFSET)
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use grafos_std::host;
163    use grafos_std::mem::MemBuilder;
164
165    fn setup() -> MemLease {
166        host::reset_mock();
167        host::mock_set_fbmu_arena_size(4096);
168        MemBuilder::new().acquire().expect("acquire lease")
169    }
170
171    #[test]
172    fn barrier_three_parties_sequential() {
173        let lease = setup();
174        let barrier = FabricBarrier::new(lease, 0, 3).expect("new barrier");
175
176        assert_eq!(barrier.generation().unwrap(), 0);
177        assert_eq!(barrier.count().unwrap(), 0);
178
179        // Party 1 arrives — count goes to 1, waits for generation change
180        // Since we're sequential, party 1 would block. Instead, we simulate
181        // by directly calling wait for all three in sequence.
182        // Party 1: count 0 → 1, not last, starts polling
183        // But since we're single-threaded, we need to simulate this differently.
184        //
185        // In the sequential mock, we call wait() three times. The first two
186        // increment the count, and the third sees count == num_parties and
187        // advances the generation. The first two will poll and see the
188        // generation has changed (because the third call already ran).
189        //
190        // To make this work in single-threaded tests, we simulate the
191        // arrivals manually and then call wait for the last party.
192
193        // Manually set count to 2 (simulating 2 parties already arrived)
194        let mem = barrier.lease.mem();
195        write_u64(mem, COUNT_OFFSET, 2).unwrap();
196
197        // Third party calls wait — should complete immediately
198        let gen = barrier.wait(10).expect("wait");
199        assert_eq!(gen, 1);
200        assert_eq!(barrier.generation().unwrap(), 1);
201        assert_eq!(barrier.count().unwrap(), 0); // reset after all arrive
202    }
203
204    #[test]
205    fn barrier_reuse_across_generations() {
206        let lease = setup();
207        let barrier = FabricBarrier::new(lease, 0, 2).expect("new barrier");
208
209        // Generation 0: simulate first party arrived
210        let mem = barrier.lease.mem();
211        write_u64(mem, COUNT_OFFSET, 1).unwrap();
212
213        // Second party arrives — completes generation 0
214        let gen = barrier.wait(10).expect("wait gen 0");
215        assert_eq!(gen, 1);
216        assert_eq!(barrier.count().unwrap(), 0);
217
218        // Generation 1: simulate first party arrived again
219        write_u64(mem, barrier.base_offset + COUNT_OFFSET, 1).unwrap();
220
221        // Second party arrives — completes generation 1
222        let gen = barrier.wait(10).expect("wait gen 1");
223        assert_eq!(gen, 2);
224        assert_eq!(barrier.generation().unwrap(), 2);
225    }
226
227    #[test]
228    fn barrier_timeout_when_not_all_arrive() {
229        let lease = setup();
230        let barrier = FabricBarrier::new(lease, 0, 3).expect("new barrier");
231
232        // Only 1 party arrives (count stays at 0 before wait increments to 1)
233        // wait will poll but generation never changes
234        let result = barrier.wait(5);
235        assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
236    }
237}