grafos_locator/
handoff.rs

1//! Handoff records: durable writer/reader for locator rendezvous.
2//!
3//! A [`HandoffWriter`] publishes locators with monotonically increasing
4//! [`FenceEpoch`] generations to block storage. A [`HandoffReader`] polls
5//! the same block storage and detects when a new generation has been written.
6//!
7//! The underlying persistence uses the same checkpoint shape as
8//! `grafos-collections` durable snapshots: a header block followed by
9//! postcard-serialized data blocks.
10//!
11//! # Fail-closed
12//!
13//! Corrupt or unreadable records cause the reader to return an error, never
14//! a stale or default value.
15
16extern crate alloc;
17use alloc::vec::Vec;
18
19use grafos_fence::FenceEpoch;
20use grafos_std::block::{BlockLease, FabricBlock, BLOCK_SIZE};
21use grafos_std::error::{FabricError, Result};
22use serde::{de::DeserializeOwned, Deserialize, Serialize};
23
24/// Durable checkpoint constants (matching grafos-collections Durable format).
25const MAGIC: [u8; 4] = *b"DCHK";
26const DURABLE_VERSION: u32 = 1;
27const HEADER_BLOCK: u64 = 0;
28const DATA_START_BLOCK: u64 = 1;
29
30/// The state persisted to block storage by [`HandoffWriter`].
31///
32/// Contains the locator value, a monotonic generation counter, and a
33/// stage identifier for multi-phase handoff protocols.
34#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
35pub struct HandoffState<T> {
36    /// Logical stage identifier for multi-stage workflows.
37    pub stage_id: u64,
38    /// Monotonic generation used for stale-state rejection.
39    pub generation: FenceEpoch,
40    /// Published locator payload.
41    pub locator: T,
42}
43
44/// Checkpoint a `HandoffState<T>` to a `FabricBlock` handle using the Durable
45/// format (header block + postcard data blocks).
46fn checkpoint_to_block<T: Serialize>(state: &HandoffState<T>, blk: &FabricBlock) -> Result<()> {
47    let data = postcard::to_allocvec(state).map_err(|_| FabricError::IoError(-1))?;
48    let data_len = data.len() as u64;
49
50    // Write header block
51    let mut header = [0u8; BLOCK_SIZE];
52    header[0..4].copy_from_slice(&MAGIC);
53    header[4..8].copy_from_slice(&DURABLE_VERSION.to_le_bytes());
54    header[8..16].copy_from_slice(&data_len.to_le_bytes());
55    blk.write_block(HEADER_BLOCK, &header)?;
56
57    // Write data blocks
58    let num_blocks = data.len().div_ceil(BLOCK_SIZE);
59    let available = blk.num_blocks() as usize;
60    if num_blocks + 1 > available {
61        return Err(FabricError::CapacityExceeded);
62    }
63
64    for i in 0..num_blocks {
65        let start = i * BLOCK_SIZE;
66        let end = core::cmp::min(start + BLOCK_SIZE, data.len());
67        let mut block = [0u8; BLOCK_SIZE];
68        block[..end - start].copy_from_slice(&data[start..end]);
69        blk.write_block(DATA_START_BLOCK + i as u64, &block)?;
70    }
71
72    Ok(())
73}
74
75/// Restore a `HandoffState<T>` from a `FabricBlock` handle using the Durable
76/// format. Returns an error on corrupt data (fail closed).
77fn restore_from_block<T: DeserializeOwned>(blk: &FabricBlock) -> Result<HandoffState<T>> {
78    let header = blk.read_block(HEADER_BLOCK)?;
79
80    if header[0..4] != MAGIC {
81        return Err(FabricError::IoError(-2));
82    }
83    let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
84    if version != DURABLE_VERSION {
85        return Err(FabricError::IoError(-3));
86    }
87    let data_len = u64::from_le_bytes([
88        header[8], header[9], header[10], header[11], header[12], header[13], header[14],
89        header[15],
90    ]) as usize;
91
92    let num_blocks = data_len.div_ceil(BLOCK_SIZE);
93    let mut data = Vec::with_capacity(data_len);
94    for i in 0..num_blocks {
95        let block = blk.read_block(DATA_START_BLOCK + i as u64)?;
96        let remaining = data_len - data.len();
97        let take = core::cmp::min(BLOCK_SIZE, remaining);
98        data.extend_from_slice(&block[..take]);
99    }
100
101    postcard::from_bytes(&data).map_err(|_| FabricError::IoError(-1))
102}
103
104/// Writes locator handoff records to block storage.
105///
106/// Each [`publish`](HandoffWriter::publish) call bumps the generation,
107/// updates the locator, and checkpoints the state to the underlying
108/// [`BlockLease`].
109///
110/// # Example
111///
112/// ```rust,no_run
113/// use grafos_locator::handoff::{HandoffWriter, HandoffState};
114/// use grafos_locator::locator::QueueLocator;
115/// use grafos_fence::FenceEpoch;
116/// use grafos_std::block::BlockBuilder;
117///
118/// # grafos_std::host::reset_mock();
119/// # grafos_std::host::mock_set_fbbu_num_blocks(64);
120/// let lease = BlockBuilder::new().acquire().unwrap();
121/// let initial = QueueLocator::new(1, 0, 0);
122/// let mut writer = HandoffWriter::new(
123///     HandoffState { stage_id: 0, generation: FenceEpoch::zero(), locator: initial },
124///     lease,
125/// );
126/// let new_loc = QueueLocator::new(2, 512, 1);
127/// let gen = writer.publish(new_loc).unwrap();
128/// assert_eq!(gen.value(), 1);
129/// ```
130pub struct HandoffWriter<T> {
131    state: HandoffState<T>,
132    block_lease: BlockLease,
133}
134
135impl<T: Serialize + DeserializeOwned> HandoffWriter<T> {
136    /// Create a new handoff writer with the given initial state.
137    ///
138    /// Does **not** checkpoint the initial state to disk. Call
139    /// [`publish`](Self::publish) to persist.
140    pub fn new(state: HandoffState<T>, block_lease: BlockLease) -> Self {
141        Self { state, block_lease }
142    }
143
144    /// Publish a new locator, bumping the generation and checkpointing to
145    /// block storage.
146    ///
147    /// Returns the new [`FenceEpoch`] after the bump.
148    ///
149    /// # Errors
150    ///
151    /// Propagates block I/O or serialization errors.
152    pub fn publish(&mut self, new_locator: T) -> Result<FenceEpoch> {
153        self.state.generation = self.state.generation.bump();
154        self.state.locator = new_locator;
155        checkpoint_to_block(&self.state, self.block_lease.block())?;
156        Ok(self.state.generation)
157    }
158
159    /// Returns the current generation.
160    pub fn generation(&self) -> FenceEpoch {
161        self.state.generation
162    }
163
164    /// Returns a reference to the current locator.
165    pub fn current_locator(&self) -> &T {
166        &self.state.locator
167    }
168
169    /// Consume the writer and return the underlying block lease.
170    ///
171    /// This allows transferring the lease to a [`HandoffReader`] so that
172    /// both sides access the same underlying block storage.
173    pub fn into_block_lease(self) -> BlockLease {
174        self.block_lease
175    }
176}
177
178/// Reads locator handoff records from block storage.
179///
180/// Each [`poll`](HandoffReader::poll) call reads the block lease, deserializes
181/// the handoff state, and returns `Some(state)` if the generation has changed
182/// since the last successful poll.
183///
184/// # Fail-closed
185///
186/// If the block data is corrupt or deserialization fails, `poll` returns an
187/// error — never a stale or default value.
188///
189/// # Example
190///
191/// ```rust,no_run
192/// use grafos_locator::handoff::HandoffReader;
193/// use grafos_locator::locator::QueueLocator;
194/// use grafos_std::block::BlockBuilder;
195///
196/// # grafos_std::host::reset_mock();
197/// # grafos_std::host::mock_set_fbbu_num_blocks(64);
198/// let lease = BlockBuilder::new().acquire().unwrap();
199/// let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(lease);
200/// match reader.poll() {
201///     Ok(Some(state)) => println!("new locator: {:?}", state.locator),
202///     Ok(None) => println!("no change"),
203///     Err(e) => println!("read error: {e}"),
204/// }
205/// ```
206pub struct HandoffReader<T> {
207    block_lease: BlockLease,
208    last_generation: Option<FenceEpoch>,
209    last_locator: Option<T>,
210}
211
212impl<T: Serialize + DeserializeOwned + Clone> HandoffReader<T> {
213    /// Create a new handoff reader attached to the given block lease.
214    pub fn new(block_lease: BlockLease) -> Self {
215        Self {
216            block_lease,
217            last_generation: None,
218            last_locator: None,
219        }
220    }
221
222    /// Poll the block storage for a new handoff state.
223    ///
224    /// Returns `Ok(Some(state))` if the generation has changed since the
225    /// last successful poll, `Ok(None)` if unchanged, or `Err` on I/O or
226    /// deserialization failure.
227    pub fn poll(&mut self) -> Result<Option<HandoffState<T>>> {
228        let state: HandoffState<T> = restore_from_block(self.block_lease.block())?;
229
230        if self.last_generation == Some(state.generation) {
231            return Ok(None);
232        }
233
234        self.last_generation = Some(state.generation);
235        self.last_locator = Some(state.locator.clone());
236
237        Ok(Some(state))
238    }
239
240    /// Returns `true` if the last [`poll`](Self::poll) returned a new generation.
241    pub fn changed(&self) -> bool {
242        self.last_generation.is_some()
243    }
244
245    /// Returns the last generation seen by [`poll`](Self::poll), if any.
246    pub fn last_generation(&self) -> Option<FenceEpoch> {
247        self.last_generation
248    }
249
250    /// Returns a reference to the last locator seen by [`poll`](Self::poll), if any.
251    pub fn last_locator(&self) -> Option<&T> {
252        self.last_locator.as_ref()
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use crate::locator::{MemRegionLocator, QueueLocator, RpcArenaLocator};
260    use grafos_std::block::BlockBuilder;
261    use grafos_std::host;
262
263    fn setup_block(num_blocks: u64) -> BlockLease {
264        host::reset_mock();
265        host::mock_set_fbbu_num_blocks(num_blocks);
266        BlockBuilder::new().acquire().expect("acquire")
267    }
268
269    #[test]
270    fn publish_bumps_generation() {
271        let lease = setup_block(64);
272        let initial = QueueLocator::new(1, 0, 0);
273        let mut writer = HandoffWriter::new(
274            HandoffState {
275                stage_id: 0,
276                generation: FenceEpoch::zero(),
277                locator: initial,
278            },
279            lease,
280        );
281
282        assert_eq!(writer.generation(), FenceEpoch::zero());
283
284        let new_loc = QueueLocator::new(2, 512, 1);
285        let gen = writer.publish(new_loc).expect("publish");
286        assert_eq!(gen, FenceEpoch::new(1));
287        assert_eq!(writer.generation(), FenceEpoch::new(1));
288    }
289
290    #[test]
291    fn reader_detects_change() {
292        let lease = setup_block(64);
293        let initial = QueueLocator::new(1, 0, 0);
294        let mut writer = HandoffWriter::new(
295            HandoffState {
296                stage_id: 0,
297                generation: FenceEpoch::zero(),
298                locator: initial,
299            },
300            lease,
301        );
302
303        let loc = QueueLocator::new(1, 0, 0);
304        writer.publish(loc).expect("publish");
305
306        // Transfer the block lease from writer to reader (same lease_id,
307        // same underlying block data in the mock).
308        let reader_lease = writer.into_block_lease();
309        let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
310
311        let result = reader.poll().expect("poll");
312        assert!(result.is_some());
313        let state = result.unwrap();
314        assert_eq!(state.generation, FenceEpoch::new(1));
315        assert_eq!(state.locator.lease_id, 1);
316    }
317
318    #[test]
319    fn reader_returns_none_when_no_change() {
320        let lease = setup_block(64);
321        let initial = QueueLocator::new(1, 0, 0);
322        let mut writer = HandoffWriter::new(
323            HandoffState {
324                stage_id: 0,
325                generation: FenceEpoch::zero(),
326                locator: initial,
327            },
328            lease,
329        );
330
331        writer.publish(QueueLocator::new(1, 0, 0)).expect("publish");
332
333        let reader_lease = writer.into_block_lease();
334        let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
335
336        // First poll sees the change
337        let result = reader.poll().expect("poll 1");
338        assert!(result.is_some());
339
340        // Second poll with same generation returns None
341        let result = reader.poll().expect("poll 2");
342        assert!(result.is_none());
343    }
344
345    #[test]
346    fn multiple_publishes_monotonic_generation() {
347        let lease = setup_block(64);
348        let initial = MemRegionLocator::new(1, 0, 1024);
349        let mut writer = HandoffWriter::new(
350            HandoffState {
351                stage_id: 0,
352                generation: FenceEpoch::zero(),
353                locator: initial,
354            },
355            lease,
356        );
357
358        let g1 = writer
359            .publish(MemRegionLocator::new(1, 0, 2048))
360            .expect("publish 1");
361        let g2 = writer
362            .publish(MemRegionLocator::new(1, 0, 4096))
363            .expect("publish 2");
364        let g3 = writer
365            .publish(MemRegionLocator::new(2, 0, 1024))
366            .expect("publish 3");
367
368        assert!(g2.is_newer(&g1));
369        assert!(g3.is_newer(&g2));
370        assert_eq!(g1.value(), 1);
371        assert_eq!(g2.value(), 2);
372        assert_eq!(g3.value(), 3);
373    }
374
375    #[test]
376    fn corrupt_record_returns_error() {
377        let lease = setup_block(64);
378        // Write garbage to block 0 (corrupt the Durable header)
379        let mut garbage = [0u8; BLOCK_SIZE];
380        garbage[0..4].copy_from_slice(b"BAAD");
381        lease.block().write_block(0, &garbage).expect("write");
382
383        let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(lease);
384
385        let result = reader.poll();
386        assert!(result.is_err());
387    }
388
389    #[test]
390    fn current_locator_reflects_publish() {
391        let lease = setup_block(64);
392        let initial = RpcArenaLocator::new(1, 0, 4096, 0);
393        let mut writer = HandoffWriter::new(
394            HandoffState {
395                stage_id: 0,
396                generation: FenceEpoch::zero(),
397                locator: initial,
398            },
399            lease,
400        );
401
402        assert_eq!(writer.current_locator().request_offset, 0);
403
404        let new_loc = RpcArenaLocator::new(1, 1024, 8192, 1);
405        writer.publish(new_loc).expect("publish");
406
407        assert_eq!(writer.current_locator().request_offset, 1024);
408        assert_eq!(writer.current_locator().response_offset, 8192);
409    }
410
411    #[test]
412    fn reader_last_locator_and_generation() {
413        let lease = setup_block(64);
414        let initial = QueueLocator::new(1, 0, 0);
415        let mut writer = HandoffWriter::new(
416            HandoffState {
417                stage_id: 0,
418                generation: FenceEpoch::zero(),
419                locator: initial,
420            },
421            lease,
422        );
423
424        writer
425            .publish(QueueLocator::new(42, 256, 7))
426            .expect("publish");
427
428        let reader_lease = writer.into_block_lease();
429        let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
430
431        // Before first poll
432        assert!(reader.last_generation().is_none());
433        assert!(reader.last_locator().is_none());
434
435        reader.poll().expect("poll");
436
437        assert_eq!(reader.last_generation(), Some(FenceEpoch::new(1)));
438        let loc = reader.last_locator().unwrap();
439        assert_eq!(loc.lease_id, 42);
440        assert_eq!(loc.base_offset, 256);
441    }
442
443    #[test]
444    fn handoff_state_serialization_roundtrip() {
445        let state = HandoffState {
446            stage_id: 99,
447            generation: FenceEpoch::new(7),
448            locator: MemRegionLocator::new(0xABCD, 128, 512),
449        };
450        let bytes = postcard::to_allocvec(&state).expect("serialize");
451        let decoded: HandoffState<MemRegionLocator> =
452            postcard::from_bytes(&bytes).expect("deserialize");
453        assert_eq!(state, decoded);
454    }
455}