grafos_pipeline/checkpoint.rs
1//! Durable checkpoint support for pipeline stage state.
2
3extern crate alloc;
4use alloc::boxed::Box;
5use alloc::format;
6use alloc::vec::Vec;
7
8use grafos_std::block::{BlockLease, BLOCK_SIZE};
9use serde::{de::DeserializeOwned, Serialize};
10
11use crate::error::EdgeError;
12
13/// Magic bytes identifying a pipeline checkpoint record.
14const MAGIC: [u8; 4] = *b"PCHK";
15const VERSION: u32 = 1;
16const HEADER_BLOCK: u64 = 0;
17const DATA_START_BLOCK: u64 = 1;
18
19// ---------------------------------------------------------------------------
20// CheckpointFlushObserver — Phase 218.2 / slice 96
21// ---------------------------------------------------------------------------
22
23/// Sideband observer notified at checkpoint flush start and complete.
24///
25/// Slice 96 wires this so the scheduler-service `PreemptionManager`
26/// can mark a lease as `NonPreemptibleReason::CheckpointInProgress`
27/// for the duration of a flush. The trait is intentionally minimal
28/// and observational — implementations MUST NOT block, return errors
29/// the flush can act on, or otherwise change the flush's success
30/// semantics. A failed observer notification (e.g. lease not tracked
31/// by the preemption manager because it was released / expired /
32/// fenced concurrently) is a sideband condition the observer logs
33/// internally; it does NOT abort the checkpoint flush. The
34/// checkpoint is the source of truth for stage durability; the
35/// observer is a sideband signal to the scheduler.
36///
37/// **Why a trait callback rather than a direct cross-crate call?**
38/// `grafos-pipeline` is `no_std + alloc` and intentionally has no
39/// dependency on `grafos-scheduler-service` (which is `std`-only and
40/// depends on the http server, audit chain, etc.). A direct call
41/// would either force the pipeline crate into `std` or pull
42/// scheduler-service into `no_std` builds. The trait keeps both
43/// sides cycle-free: the producer (pipeline) calls a small trait;
44/// the consumer (scheduler-service) implements it.
45///
46/// **Lifecycle contract:**
47/// - `on_flush_start(lease_id)` is called exactly ONCE at the
48/// beginning of a `checkpoint()` call when both the observer and
49/// `lease_id` are wired.
50/// - `on_flush_complete(lease_id)` is called exactly ONCE at the
51/// end of `checkpoint()` — on BOTH the success path and every
52/// failure path. The implementation is responsible for the RAII
53/// guard via a `Drop`-based scope; producers don't need to
54/// sprinkle clears at every error site.
55pub trait CheckpointFlushObserver {
56 /// Called at the START of a checkpoint flush. Implementations
57 /// should mark the lease as transiently non-preemptible.
58 fn on_flush_start(&self, lease_id: u128);
59
60 /// Called at the END of a checkpoint flush (both success AND
61 /// failure paths). Implementations should clear the
62 /// non-preemptible marker.
63 fn on_flush_complete(&self, lease_id: u128);
64}
65
66/// RAII guard that calls `on_flush_complete` on drop. Ensures the
67/// marker is cleared even if the flush errors part-way through. Per
68/// the slice 96 requirement: "if a flush fails partway, the marker
69/// MUST be cleared so the lease becomes preemptible again."
70struct FlushGuard<'a> {
71 observer: &'a dyn CheckpointFlushObserver,
72 lease_id: u128,
73}
74
75impl<'a> Drop for FlushGuard<'a> {
76 fn drop(&mut self) {
77 self.observer.on_flush_complete(self.lease_id);
78 }
79}
80
81/// A stage state wrapper with durable checkpoint/restore support.
82///
83/// Serializes the state to a [`BlockLease`] on checkpoint and
84/// deserializes on restore, using a format compatible with the
85/// grafos-collections Durable pattern.
86///
87/// # Fail-closed
88///
89/// If a checkpoint write fails, the error is propagated immediately.
90/// Callers should halt the stage rather than continue with an
91/// uncheckpointed state.
92///
93/// # Preemption protection (Phase 218.2 / slice 96)
94///
95/// When constructed with [`CheckpointedStageState::with_observer`],
96/// the wrapper notifies a [`CheckpointFlushObserver`] at flush start
97/// and complete so the scheduler-service can mark the lease as
98/// `NonPreemptibleReason::CheckpointInProgress`. The marker is
99/// cleared on BOTH the success path and every failure path via an
100/// RAII guard. Wrappers constructed via [`CheckpointedStageState::new`]
101/// continue to flush without the sideband notification (back-compat).
102pub struct CheckpointedStageState<T> {
103 state: T,
104 block_lease: BlockLease,
105 /// Identifier of the lease backing this stage's durable state.
106 /// `None` for back-compat callers; `Some(_)` when the producer
107 /// has named the lease whose preemption window the flush
108 /// occupies.
109 lease_id: Option<u128>,
110 /// Sideband observer notified at flush start / complete. `None`
111 /// for back-compat callers; `Some(_)` when wired via
112 /// [`Self::with_observer`].
113 observer: Option<Box<dyn CheckpointFlushObserver + Send + Sync>>,
114}
115
116impl<T: Serialize + DeserializeOwned> CheckpointedStageState<T> {
117 /// Create a new checkpointed state wrapper.
118 ///
119 /// Does **not** write an initial checkpoint. Call [`checkpoint`](Self::checkpoint)
120 /// to persist.
121 ///
122 /// Constructed without a [`CheckpointFlushObserver`] — checkpoint
123 /// flushes do NOT emit preemption-protection sideband notifications.
124 /// Use [`Self::with_observer`] to attach the marker producer for
125 /// `NonPreemptibleReason::CheckpointInProgress` (Phase 218.2 /
126 /// slice 96).
127 pub fn new(state: T, block_lease: BlockLease) -> Self {
128 Self {
129 state,
130 block_lease,
131 lease_id: None,
132 observer: None,
133 }
134 }
135
136 /// Phase 218.2 / slice 96 — attach a [`CheckpointFlushObserver`]
137 /// and the `lease_id` of the lease whose preemption window the
138 /// flush occupies. Calls to [`Self::checkpoint`] then notify the
139 /// observer at flush start AND on flush complete (both success
140 /// AND failure paths, via an RAII guard).
141 ///
142 /// The producer is responsible for naming the correct
143 /// `lease_id` — the lease backing the stage's durable
144 /// checkpoint storage ([`BlockLease`]). The observer is
145 /// typically the scheduler-service `PreemptionManager`'s
146 /// checkpoint adapter (see
147 /// `grafos_scheduler_service::preemption::PreemptionCheckpointObserver`).
148 pub fn with_observer(
149 state: T,
150 block_lease: BlockLease,
151 lease_id: u128,
152 observer: Box<dyn CheckpointFlushObserver + Send + Sync>,
153 ) -> Self {
154 Self {
155 state,
156 block_lease,
157 lease_id: Some(lease_id),
158 observer: Some(observer),
159 }
160 }
161
162 /// Serialize the current state to block storage.
163 ///
164 /// Uses a header block + data block layout:
165 /// - Block 0: magic (4) + version (4) + data_len (8)
166 /// - Blocks 1..N: postcard-serialized state bytes
167 ///
168 /// # Preemption protection (Phase 218.2 / slice 96)
169 ///
170 /// When this `CheckpointedStageState` was constructed via
171 /// [`Self::with_observer`], the observer's `on_flush_start` is
172 /// called at the top of this method and `on_flush_complete` is
173 /// called when the flush returns — on BOTH the success path and
174 /// every failure path. The notification is implemented via an
175 /// RAII guard so partway-through errors don't leave the lease
176 /// stuck in `CheckpointInProgress`.
177 ///
178 /// # Errors
179 ///
180 /// Returns [`EdgeError::CheckpointFailed`] on serialization or I/O failure.
181 pub fn checkpoint(&mut self) -> Result<(), EdgeError> {
182 // Phase 218.2 / slice 96 — sideband flush-protection
183 // notification. The guard's Drop ensures `on_flush_complete`
184 // runs on every exit path (Ok, Err, or panic-unwind).
185 let _flush_guard = match (self.observer.as_deref(), self.lease_id) {
186 (Some(observer), Some(lease_id)) => {
187 observer.on_flush_start(lease_id);
188 Some(FlushGuard { observer, lease_id })
189 }
190 _ => None,
191 };
192
193 let data = postcard::to_allocvec(&self.state)
194 .map_err(|e| EdgeError::CheckpointFailed(format!("serialize: {e}")))?;
195
196 let data_len = data.len() as u64;
197
198 // Write header block
199 let mut header = [0u8; BLOCK_SIZE];
200 header[0..4].copy_from_slice(&MAGIC);
201 header[4..8].copy_from_slice(&VERSION.to_le_bytes());
202 header[8..16].copy_from_slice(&data_len.to_le_bytes());
203 self.block_lease
204 .block()
205 .write_block(HEADER_BLOCK, &header)
206 .map_err(|e| EdgeError::CheckpointFailed(format!("write header: {e}")))?;
207
208 // Write data blocks
209 let num_blocks = data.len().div_ceil(BLOCK_SIZE);
210 let available = self.block_lease.block().num_blocks() as usize;
211 if num_blocks + 1 > available {
212 return Err(EdgeError::CheckpointFailed(
213 "state too large for block lease".into(),
214 ));
215 }
216
217 for i in 0..num_blocks {
218 let start = i * BLOCK_SIZE;
219 let end = core::cmp::min(start + BLOCK_SIZE, data.len());
220 let mut block = [0u8; BLOCK_SIZE];
221 block[..end - start].copy_from_slice(&data[start..end]);
222 self.block_lease
223 .block()
224 .write_block(DATA_START_BLOCK + i as u64, &block)
225 .map_err(|e| EdgeError::CheckpointFailed(format!("write data block {i}: {e}")))?;
226 }
227
228 Ok(())
229 }
230
231 /// Restore a checkpointed state from block storage.
232 ///
233 /// Reads the header block to determine data length, then reads and
234 /// deserializes the state from the data blocks.
235 ///
236 /// # Errors
237 ///
238 /// Returns [`EdgeError::CheckpointFailed`] on corrupt data, version
239 /// mismatch, or deserialization failure.
240 pub fn restore(block_lease: BlockLease) -> Result<Self, EdgeError> {
241 let header = block_lease
242 .block()
243 .read_block(HEADER_BLOCK)
244 .map_err(|e| EdgeError::CheckpointFailed(format!("read header: {e}")))?;
245
246 if header[0..4] != MAGIC {
247 return Err(EdgeError::CheckpointFailed("bad magic".into()));
248 }
249 let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
250 if version != VERSION {
251 return Err(EdgeError::CheckpointFailed(format!(
252 "unsupported version: {version}"
253 )));
254 }
255 let data_len = u64::from_le_bytes([
256 header[8], header[9], header[10], header[11], header[12], header[13], header[14],
257 header[15],
258 ]) as usize;
259
260 let num_blocks = data_len.div_ceil(BLOCK_SIZE);
261 let mut data = Vec::with_capacity(data_len);
262 for i in 0..num_blocks {
263 let block = block_lease
264 .block()
265 .read_block(DATA_START_BLOCK + i as u64)
266 .map_err(|e| EdgeError::CheckpointFailed(format!("read data block {i}: {e}")))?;
267 let remaining = data_len - data.len();
268 let take = core::cmp::min(BLOCK_SIZE, remaining);
269 data.extend_from_slice(&block[..take]);
270 }
271
272 let state: T = postcard::from_bytes(&data)
273 .map_err(|e| EdgeError::CheckpointFailed(format!("deserialize: {e}")))?;
274
275 Ok(Self {
276 state,
277 block_lease,
278 lease_id: None,
279 observer: None,
280 })
281 }
282
283 /// Borrow the current state.
284 pub fn state(&self) -> &T {
285 &self.state
286 }
287
288 /// Mutably borrow the current state.
289 pub fn state_mut(&mut self) -> &mut T {
290 &mut self.state
291 }
292
293 /// Consume the wrapper and return the underlying block lease.
294 ///
295 /// This allows transferring the lease to a subsequent
296 /// [`restore`](Self::restore) call so that both sides access the
297 /// same underlying block storage.
298 pub fn into_block_lease(self) -> BlockLease {
299 self.block_lease
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use crate::EdgeCheckpoint;
307 use grafos_std::block::BlockBuilder;
308 use grafos_std::host;
309
310 fn setup_block(num_blocks: u64) -> BlockLease {
311 host::reset_mock();
312 host::mock_set_fbbu_num_blocks(num_blocks);
313 BlockBuilder::new().acquire().expect("acquire")
314 }
315
316 #[test]
317 fn checkpoint_and_restore_roundtrip() {
318 let block_lease = setup_block(64);
319
320 let state = EdgeCheckpoint {
321 input_offset: 42,
322 output_commit: 99,
323 };
324
325 let mut ckpt = CheckpointedStageState::new(state, block_lease);
326 ckpt.checkpoint().expect("checkpoint");
327
328 // Transfer the block lease so restore reads the same blocks.
329 let lease = ckpt.into_block_lease();
330 let restored = CheckpointedStageState::<EdgeCheckpoint>::restore(lease).expect("restore");
331 assert_eq!(restored.state().input_offset, 42);
332 assert_eq!(restored.state().output_commit, 99);
333 }
334
335 #[test]
336 fn restore_fails_on_corrupt_data() {
337 let block_lease = setup_block(64);
338 // Write garbage to block 0
339 let mut garbage = [0u8; BLOCK_SIZE];
340 garbage[0..4].copy_from_slice(b"BAAD");
341 block_lease.block().write_block(0, &garbage).expect("write");
342
343 let result = CheckpointedStageState::<EdgeCheckpoint>::restore(block_lease);
344 match result {
345 Err(EdgeError::CheckpointFailed(msg)) => assert_eq!(msg, "bad magic"),
346 Err(other) => panic!("unexpected error: {other}"),
347 Ok(_) => panic!("expected error but got Ok"),
348 }
349 }
350
351 #[test]
352 fn state_mut_allows_modification() {
353 let block_lease = setup_block(64);
354 let state = EdgeCheckpoint {
355 input_offset: 0,
356 output_commit: 0,
357 };
358
359 let mut ckpt = CheckpointedStageState::new(state, block_lease);
360 ckpt.state_mut().input_offset = 100;
361 ckpt.state_mut().output_commit = 200;
362
363 assert_eq!(ckpt.state().input_offset, 100);
364 assert_eq!(ckpt.state().output_commit, 200);
365 }
366
367 #[test]
368 fn checkpoint_preserves_complex_state() {
369 let block_lease = setup_block(64);
370
371 #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
372 struct StageState {
373 name: alloc::string::String,
374 items_processed: u64,
375 checkpoint: EdgeCheckpoint,
376 }
377
378 let state = StageState {
379 name: "transform-stage".into(),
380 items_processed: 1024,
381 checkpoint: EdgeCheckpoint {
382 input_offset: 500,
383 output_commit: 480,
384 },
385 };
386
387 let mut ckpt = CheckpointedStageState::new(state.clone(), block_lease);
388 ckpt.checkpoint().expect("checkpoint");
389
390 // Transfer the block lease so restore reads the same blocks.
391 let lease = ckpt.into_block_lease();
392 let restored = CheckpointedStageState::<StageState>::restore(lease).expect("restore");
393 assert_eq!(restored.state(), &state);
394 }
395
396 // -----------------------------------------------------------------
397 // Phase 218.2 / slice 96 — CheckpointFlushObserver pin tests
398 // -----------------------------------------------------------------
399 //
400 // The slice-96 contract: when a `CheckpointedStageState` is
401 // constructed via `with_observer`, the observer's
402 // `on_flush_start(lease_id)` runs at the top of `checkpoint()` and
403 // `on_flush_complete(lease_id)` runs at the end — on BOTH the
404 // success path and every failure path. Three pin tests cover:
405 // - mark on flush start
406 // - clear on success
407 // - clear on failure (the most load-bearing — without RAII
408 // discipline, a partway-through flush would leave the lease
409 // stuck in `CheckpointInProgress` forever)
410 //
411 // The observer is a sideband signal to the scheduler; it does NOT
412 // change the flush's success semantics. A failed observer
413 // notification (lease not tracked, etc.) is the implementation's
414 // problem to handle internally.
415
416 use core::sync::atomic::{AtomicU64, Ordering};
417
418 /// Test helper. Records the order and lease_id of every
419 /// `on_flush_start` / `on_flush_complete` call so tests can pin
420 /// the lifecycle precisely.
421 #[derive(Default)]
422 struct RecordingObserver {
423 start_count: AtomicU64,
424 complete_count: AtomicU64,
425 last_start_lease: AtomicU64,
426 last_complete_lease: AtomicU64,
427 }
428
429 impl CheckpointFlushObserver for alloc::sync::Arc<RecordingObserver> {
430 fn on_flush_start(&self, lease_id: u128) {
431 self.start_count.fetch_add(1, Ordering::SeqCst);
432 // Test leases all fit in u64; truncating is safe for the
433 // recording assertion.
434 self.last_start_lease
435 .store(lease_id as u64, Ordering::SeqCst);
436 }
437
438 fn on_flush_complete(&self, lease_id: u128) {
439 self.complete_count.fetch_add(1, Ordering::SeqCst);
440 self.last_complete_lease
441 .store(lease_id as u64, Ordering::SeqCst);
442 }
443 }
444
445 #[test]
446 fn checkpoint_flush_marks_lease_non_preemptible() {
447 // Verify `on_flush_start` is called with the correct lease_id
448 // at the BEGINNING of the flush. Pinned together with the
449 // success-path clear test below — the start count is 1 and
450 // the recorded lease_id matches the one the producer wired.
451 let block_lease = setup_block(64);
452 let observer = alloc::sync::Arc::new(RecordingObserver::default());
453 let lease_id: u128 = 0xdead_beef_cafe_babe;
454
455 let state = EdgeCheckpoint {
456 input_offset: 7,
457 output_commit: 7,
458 };
459
460 let mut ckpt = CheckpointedStageState::with_observer(
461 state,
462 block_lease,
463 lease_id,
464 alloc::boxed::Box::new(observer.clone()),
465 );
466 ckpt.checkpoint().expect("checkpoint");
467
468 assert_eq!(
469 observer.start_count.load(Ordering::SeqCst),
470 1,
471 "on_flush_start must be called exactly once at flush start"
472 );
473 assert_eq!(
474 observer.last_start_lease.load(Ordering::SeqCst),
475 lease_id as u64,
476 "on_flush_start must receive the producer-named lease_id"
477 );
478 }
479
480 #[test]
481 fn checkpoint_flush_clears_marker_on_success() {
482 // Verify `on_flush_complete` is called once with the same
483 // lease_id at the END of a successful flush. The full
484 // lifecycle: start once, complete once, both keyed on the
485 // producer-named lease_id.
486 let block_lease = setup_block(64);
487 let observer = alloc::sync::Arc::new(RecordingObserver::default());
488 let lease_id: u128 = 0x1234_5678_9abc_def0;
489
490 let state = EdgeCheckpoint {
491 input_offset: 11,
492 output_commit: 22,
493 };
494
495 let mut ckpt = CheckpointedStageState::with_observer(
496 state,
497 block_lease,
498 lease_id,
499 alloc::boxed::Box::new(observer.clone()),
500 );
501 ckpt.checkpoint().expect("checkpoint");
502
503 assert_eq!(observer.start_count.load(Ordering::SeqCst), 1);
504 assert_eq!(
505 observer.complete_count.load(Ordering::SeqCst),
506 1,
507 "on_flush_complete must be called exactly once on success"
508 );
509 assert_eq!(
510 observer.last_complete_lease.load(Ordering::SeqCst),
511 lease_id as u64,
512 "on_flush_complete must receive the producer-named lease_id"
513 );
514 }
515
516 #[test]
517 fn checkpoint_flush_clears_marker_on_failure() {
518 // The load-bearing test. Trigger a failure partway through
519 // the flush by giving a block lease too small to hold the
520 // serialized data — the "state too large for block lease"
521 // path. Without the RAII guard discipline, a partway-through
522 // failure would leave the lease stuck in
523 // `CheckpointInProgress` forever; this test pins that
524 // `on_flush_complete` runs even when the flush returns an
525 // error.
526 //
527 // Setup: 1 block available (header only) so any non-empty
528 // serialized state triggers the size check failure.
529 let block_lease = setup_block(1);
530 let observer = alloc::sync::Arc::new(RecordingObserver::default());
531 let lease_id: u128 = 0xabcd_ef01_2345_6789;
532
533 // A state that won't fit in 0 data blocks (postcard for
534 // EdgeCheckpoint serializes to several bytes — needs at
535 // least 1 data block, but we gave only 1 total block).
536 let state = EdgeCheckpoint {
537 input_offset: 99,
538 output_commit: 100,
539 };
540
541 let mut ckpt = CheckpointedStageState::with_observer(
542 state,
543 block_lease,
544 lease_id,
545 alloc::boxed::Box::new(observer.clone()),
546 );
547 let result = ckpt.checkpoint();
548
549 // The flush failed — the size check should have rejected it.
550 assert!(
551 matches!(result, Err(EdgeError::CheckpointFailed(_))),
552 "expected CheckpointFailed, got {result:?}"
553 );
554
555 // The marker MUST still have been cleared, even though the
556 // flush failed. RAII via FlushGuard's Drop is what makes
557 // this work.
558 assert_eq!(
559 observer.start_count.load(Ordering::SeqCst),
560 1,
561 "on_flush_start runs before the size check"
562 );
563 assert_eq!(
564 observer.complete_count.load(Ordering::SeqCst),
565 1,
566 "on_flush_complete MUST run on the failure path \
567 (RAII guard via Drop) so the lease isn't stuck \
568 non-preemptible after a failed flush"
569 );
570 assert_eq!(
571 observer.last_complete_lease.load(Ordering::SeqCst),
572 lease_id as u64,
573 "on_flush_complete must clear the producer-named lease_id"
574 );
575 }
576
577 #[test]
578 fn checkpoint_without_observer_remains_silent() {
579 // Back-compat pin: the original `new()` constructor produces
580 // a `CheckpointedStageState` with no observer. Flushes do
581 // NOT panic, do NOT call into a missing observer, and do NOT
582 // change behavior relative to pre-slice-96 behavior.
583 let block_lease = setup_block(64);
584 let state = EdgeCheckpoint {
585 input_offset: 5,
586 output_commit: 5,
587 };
588
589 // No observer attached.
590 let mut ckpt = CheckpointedStageState::new(state, block_lease);
591 ckpt.checkpoint().expect("checkpoint");
592 // If we got here, the no-observer path didn't dereference a
593 // null observer or otherwise misbehave. Nothing else to
594 // assert — the absence of a panic or missed-method dispatch
595 // is the pin.
596 }
597}