1extern crate alloc;
17use alloc::vec::Vec;
18
19use grafos_fence::FenceEpoch;
20use grafos_std::block::{BlockLease, FabricBlock, BLOCK_SIZE};
21use grafos_std::error::{FabricError, Result};
22use serde::{de::DeserializeOwned, Deserialize, Serialize};
23
24const MAGIC: [u8; 4] = *b"DCHK";
26const DURABLE_VERSION: u32 = 1;
27const HEADER_BLOCK: u64 = 0;
28const DATA_START_BLOCK: u64 = 1;
29
30#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
35pub struct HandoffState<T> {
36 pub stage_id: u64,
38 pub generation: FenceEpoch,
40 pub locator: T,
42}
43
44fn checkpoint_to_block<T: Serialize>(state: &HandoffState<T>, blk: &FabricBlock) -> Result<()> {
47 let data = postcard::to_allocvec(state).map_err(|_| FabricError::IoError(-1))?;
48 let data_len = data.len() as u64;
49
50 let mut header = [0u8; BLOCK_SIZE];
52 header[0..4].copy_from_slice(&MAGIC);
53 header[4..8].copy_from_slice(&DURABLE_VERSION.to_le_bytes());
54 header[8..16].copy_from_slice(&data_len.to_le_bytes());
55 blk.write_block(HEADER_BLOCK, &header)?;
56
57 let num_blocks = data.len().div_ceil(BLOCK_SIZE);
59 let available = blk.num_blocks() as usize;
60 if num_blocks + 1 > available {
61 return Err(FabricError::CapacityExceeded);
62 }
63
64 for i in 0..num_blocks {
65 let start = i * BLOCK_SIZE;
66 let end = core::cmp::min(start + BLOCK_SIZE, data.len());
67 let mut block = [0u8; BLOCK_SIZE];
68 block[..end - start].copy_from_slice(&data[start..end]);
69 blk.write_block(DATA_START_BLOCK + i as u64, &block)?;
70 }
71
72 Ok(())
73}
74
75fn restore_from_block<T: DeserializeOwned>(blk: &FabricBlock) -> Result<HandoffState<T>> {
78 let header = blk.read_block(HEADER_BLOCK)?;
79
80 if header[0..4] != MAGIC {
81 return Err(FabricError::IoError(-2));
82 }
83 let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
84 if version != DURABLE_VERSION {
85 return Err(FabricError::IoError(-3));
86 }
87 let data_len = u64::from_le_bytes([
88 header[8], header[9], header[10], header[11], header[12], header[13], header[14],
89 header[15],
90 ]) as usize;
91
92 let num_blocks = data_len.div_ceil(BLOCK_SIZE);
93 let mut data = Vec::with_capacity(data_len);
94 for i in 0..num_blocks {
95 let block = blk.read_block(DATA_START_BLOCK + i as u64)?;
96 let remaining = data_len - data.len();
97 let take = core::cmp::min(BLOCK_SIZE, remaining);
98 data.extend_from_slice(&block[..take]);
99 }
100
101 postcard::from_bytes(&data).map_err(|_| FabricError::IoError(-1))
102}
103
104pub struct HandoffWriter<T> {
131 state: HandoffState<T>,
132 block_lease: BlockLease,
133}
134
135impl<T: Serialize + DeserializeOwned> HandoffWriter<T> {
136 pub fn new(state: HandoffState<T>, block_lease: BlockLease) -> Self {
141 Self { state, block_lease }
142 }
143
144 pub fn publish(&mut self, new_locator: T) -> Result<FenceEpoch> {
153 self.state.generation = self.state.generation.bump();
154 self.state.locator = new_locator;
155 checkpoint_to_block(&self.state, self.block_lease.block())?;
156 Ok(self.state.generation)
157 }
158
159 pub fn generation(&self) -> FenceEpoch {
161 self.state.generation
162 }
163
164 pub fn current_locator(&self) -> &T {
166 &self.state.locator
167 }
168
169 pub fn into_block_lease(self) -> BlockLease {
174 self.block_lease
175 }
176}
177
178pub struct HandoffReader<T> {
207 block_lease: BlockLease,
208 last_generation: Option<FenceEpoch>,
209 last_locator: Option<T>,
210}
211
212impl<T: Serialize + DeserializeOwned + Clone> HandoffReader<T> {
213 pub fn new(block_lease: BlockLease) -> Self {
215 Self {
216 block_lease,
217 last_generation: None,
218 last_locator: None,
219 }
220 }
221
222 pub fn poll(&mut self) -> Result<Option<HandoffState<T>>> {
228 let state: HandoffState<T> = restore_from_block(self.block_lease.block())?;
229
230 if self.last_generation == Some(state.generation) {
231 return Ok(None);
232 }
233
234 self.last_generation = Some(state.generation);
235 self.last_locator = Some(state.locator.clone());
236
237 Ok(Some(state))
238 }
239
240 pub fn changed(&self) -> bool {
242 self.last_generation.is_some()
243 }
244
245 pub fn last_generation(&self) -> Option<FenceEpoch> {
247 self.last_generation
248 }
249
250 pub fn last_locator(&self) -> Option<&T> {
252 self.last_locator.as_ref()
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use crate::locator::{MemRegionLocator, QueueLocator, RpcArenaLocator};
260 use grafos_std::block::BlockBuilder;
261 use grafos_std::host;
262
263 fn setup_block(num_blocks: u64) -> BlockLease {
264 host::reset_mock();
265 host::mock_set_fbbu_num_blocks(num_blocks);
266 BlockBuilder::new().acquire().expect("acquire")
267 }
268
269 #[test]
270 fn publish_bumps_generation() {
271 let lease = setup_block(64);
272 let initial = QueueLocator::new(1, 0, 0);
273 let mut writer = HandoffWriter::new(
274 HandoffState {
275 stage_id: 0,
276 generation: FenceEpoch::zero(),
277 locator: initial,
278 },
279 lease,
280 );
281
282 assert_eq!(writer.generation(), FenceEpoch::zero());
283
284 let new_loc = QueueLocator::new(2, 512, 1);
285 let gen = writer.publish(new_loc).expect("publish");
286 assert_eq!(gen, FenceEpoch::new(1));
287 assert_eq!(writer.generation(), FenceEpoch::new(1));
288 }
289
290 #[test]
291 fn reader_detects_change() {
292 let lease = setup_block(64);
293 let initial = QueueLocator::new(1, 0, 0);
294 let mut writer = HandoffWriter::new(
295 HandoffState {
296 stage_id: 0,
297 generation: FenceEpoch::zero(),
298 locator: initial,
299 },
300 lease,
301 );
302
303 let loc = QueueLocator::new(1, 0, 0);
304 writer.publish(loc).expect("publish");
305
306 let reader_lease = writer.into_block_lease();
309 let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
310
311 let result = reader.poll().expect("poll");
312 assert!(result.is_some());
313 let state = result.unwrap();
314 assert_eq!(state.generation, FenceEpoch::new(1));
315 assert_eq!(state.locator.lease_id, 1);
316 }
317
318 #[test]
319 fn reader_returns_none_when_no_change() {
320 let lease = setup_block(64);
321 let initial = QueueLocator::new(1, 0, 0);
322 let mut writer = HandoffWriter::new(
323 HandoffState {
324 stage_id: 0,
325 generation: FenceEpoch::zero(),
326 locator: initial,
327 },
328 lease,
329 );
330
331 writer.publish(QueueLocator::new(1, 0, 0)).expect("publish");
332
333 let reader_lease = writer.into_block_lease();
334 let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
335
336 let result = reader.poll().expect("poll 1");
338 assert!(result.is_some());
339
340 let result = reader.poll().expect("poll 2");
342 assert!(result.is_none());
343 }
344
345 #[test]
346 fn multiple_publishes_monotonic_generation() {
347 let lease = setup_block(64);
348 let initial = MemRegionLocator::new(1, 0, 1024);
349 let mut writer = HandoffWriter::new(
350 HandoffState {
351 stage_id: 0,
352 generation: FenceEpoch::zero(),
353 locator: initial,
354 },
355 lease,
356 );
357
358 let g1 = writer
359 .publish(MemRegionLocator::new(1, 0, 2048))
360 .expect("publish 1");
361 let g2 = writer
362 .publish(MemRegionLocator::new(1, 0, 4096))
363 .expect("publish 2");
364 let g3 = writer
365 .publish(MemRegionLocator::new(2, 0, 1024))
366 .expect("publish 3");
367
368 assert!(g2.is_newer(&g1));
369 assert!(g3.is_newer(&g2));
370 assert_eq!(g1.value(), 1);
371 assert_eq!(g2.value(), 2);
372 assert_eq!(g3.value(), 3);
373 }
374
375 #[test]
376 fn corrupt_record_returns_error() {
377 let lease = setup_block(64);
378 let mut garbage = [0u8; BLOCK_SIZE];
380 garbage[0..4].copy_from_slice(b"BAAD");
381 lease.block().write_block(0, &garbage).expect("write");
382
383 let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(lease);
384
385 let result = reader.poll();
386 assert!(result.is_err());
387 }
388
389 #[test]
390 fn current_locator_reflects_publish() {
391 let lease = setup_block(64);
392 let initial = RpcArenaLocator::new(1, 0, 4096, 0);
393 let mut writer = HandoffWriter::new(
394 HandoffState {
395 stage_id: 0,
396 generation: FenceEpoch::zero(),
397 locator: initial,
398 },
399 lease,
400 );
401
402 assert_eq!(writer.current_locator().request_offset, 0);
403
404 let new_loc = RpcArenaLocator::new(1, 1024, 8192, 1);
405 writer.publish(new_loc).expect("publish");
406
407 assert_eq!(writer.current_locator().request_offset, 1024);
408 assert_eq!(writer.current_locator().response_offset, 8192);
409 }
410
411 #[test]
412 fn reader_last_locator_and_generation() {
413 let lease = setup_block(64);
414 let initial = QueueLocator::new(1, 0, 0);
415 let mut writer = HandoffWriter::new(
416 HandoffState {
417 stage_id: 0,
418 generation: FenceEpoch::zero(),
419 locator: initial,
420 },
421 lease,
422 );
423
424 writer
425 .publish(QueueLocator::new(42, 256, 7))
426 .expect("publish");
427
428 let reader_lease = writer.into_block_lease();
429 let mut reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
430
431 assert!(reader.last_generation().is_none());
433 assert!(reader.last_locator().is_none());
434
435 reader.poll().expect("poll");
436
437 assert_eq!(reader.last_generation(), Some(FenceEpoch::new(1)));
438 let loc = reader.last_locator().unwrap();
439 assert_eq!(loc.lease_id, 42);
440 assert_eq!(loc.base_offset, 256);
441 }
442
443 #[test]
444 fn handoff_state_serialization_roundtrip() {
445 let state = HandoffState {
446 stage_id: 99,
447 generation: FenceEpoch::new(7),
448 locator: MemRegionLocator::new(0xABCD, 128, 512),
449 };
450 let bytes = postcard::to_allocvec(&state).expect("serialize");
451 let decoded: HandoffState<MemRegionLocator> =
452 postcard::from_bytes(&bytes).expect("deserialize");
453 assert_eq!(state, decoded);
454 }
455}