grafos_pipeline/
checkpoint.rs

1//! Durable checkpoint support for pipeline stage state.
2
3extern crate alloc;
4use alloc::boxed::Box;
5use alloc::format;
6use alloc::vec::Vec;
7
8use grafos_std::block::{BlockLease, BLOCK_SIZE};
9use serde::{de::DeserializeOwned, Serialize};
10
11use crate::error::EdgeError;
12
13/// Magic bytes identifying a pipeline checkpoint record.
14const MAGIC: [u8; 4] = *b"PCHK";
15const VERSION: u32 = 1;
16const HEADER_BLOCK: u64 = 0;
17const DATA_START_BLOCK: u64 = 1;
18
19// ---------------------------------------------------------------------------
20// CheckpointFlushObserver — Phase 218.2 / slice 96
21// ---------------------------------------------------------------------------
22
23/// Sideband observer notified at checkpoint flush start and complete.
24///
25/// Slice 96 wires this so the scheduler-service `PreemptionManager`
26/// can mark a lease as `NonPreemptibleReason::CheckpointInProgress`
27/// for the duration of a flush. The trait is intentionally minimal
28/// and observational — implementations MUST NOT block, return errors
29/// the flush can act on, or otherwise change the flush's success
30/// semantics. A failed observer notification (e.g. lease not tracked
31/// by the preemption manager because it was released / expired /
32/// fenced concurrently) is a sideband condition the observer logs
33/// internally; it does NOT abort the checkpoint flush. The
34/// checkpoint is the source of truth for stage durability; the
35/// observer is a sideband signal to the scheduler.
36///
37/// **Why a trait callback rather than a direct cross-crate call?**
38/// `grafos-pipeline` is `no_std + alloc` and intentionally has no
39/// dependency on `grafos-scheduler-service` (which is `std`-only and
40/// depends on the http server, audit chain, etc.). A direct call
41/// would either force the pipeline crate into `std` or pull
42/// scheduler-service into `no_std` builds. The trait keeps both
43/// sides cycle-free: the producer (pipeline) calls a small trait;
44/// the consumer (scheduler-service) implements it.
45///
46/// **Lifecycle contract:**
47/// - `on_flush_start(lease_id)` is called exactly ONCE at the
48///   beginning of a `checkpoint()` call when both the observer and
49///   `lease_id` are wired.
50/// - `on_flush_complete(lease_id)` is called exactly ONCE at the
51///   end of `checkpoint()` — on BOTH the success path and every
52///   failure path. The implementation is responsible for the RAII
53///   guard via a `Drop`-based scope; producers don't need to
54///   sprinkle clears at every error site.
55pub trait CheckpointFlushObserver {
56    /// Called at the START of a checkpoint flush. Implementations
57    /// should mark the lease as transiently non-preemptible.
58    fn on_flush_start(&self, lease_id: u128);
59
60    /// Called at the END of a checkpoint flush (both success AND
61    /// failure paths). Implementations should clear the
62    /// non-preemptible marker.
63    fn on_flush_complete(&self, lease_id: u128);
64}
65
66/// RAII guard that calls `on_flush_complete` on drop. Ensures the
67/// marker is cleared even if the flush errors part-way through. Per
68/// the slice 96 requirement: "if a flush fails partway, the marker
69/// MUST be cleared so the lease becomes preemptible again."
70struct FlushGuard<'a> {
71    observer: &'a dyn CheckpointFlushObserver,
72    lease_id: u128,
73}
74
75impl<'a> Drop for FlushGuard<'a> {
76    fn drop(&mut self) {
77        self.observer.on_flush_complete(self.lease_id);
78    }
79}
80
81/// A stage state wrapper with durable checkpoint/restore support.
82///
83/// Serializes the state to a [`BlockLease`] on checkpoint and
84/// deserializes on restore, using a format compatible with the
85/// grafos-collections Durable pattern.
86///
87/// # Fail-closed
88///
89/// If a checkpoint write fails, the error is propagated immediately.
90/// Callers should halt the stage rather than continue with an
91/// uncheckpointed state.
92///
93/// # Preemption protection (Phase 218.2 / slice 96)
94///
95/// When constructed with [`CheckpointedStageState::with_observer`],
96/// the wrapper notifies a [`CheckpointFlushObserver`] at flush start
97/// and complete so the scheduler-service can mark the lease as
98/// `NonPreemptibleReason::CheckpointInProgress`. The marker is
99/// cleared on BOTH the success path and every failure path via an
100/// RAII guard. Wrappers constructed via [`CheckpointedStageState::new`]
101/// continue to flush without the sideband notification (back-compat).
102pub struct CheckpointedStageState<T> {
103    state: T,
104    block_lease: BlockLease,
105    /// Identifier of the lease backing this stage's durable state.
106    /// `None` for back-compat callers; `Some(_)` when the producer
107    /// has named the lease whose preemption window the flush
108    /// occupies.
109    lease_id: Option<u128>,
110    /// Sideband observer notified at flush start / complete. `None`
111    /// for back-compat callers; `Some(_)` when wired via
112    /// [`Self::with_observer`].
113    observer: Option<Box<dyn CheckpointFlushObserver + Send + Sync>>,
114}
115
116impl<T: Serialize + DeserializeOwned> CheckpointedStageState<T> {
117    /// Create a new checkpointed state wrapper.
118    ///
119    /// Does **not** write an initial checkpoint. Call [`checkpoint`](Self::checkpoint)
120    /// to persist.
121    ///
122    /// Constructed without a [`CheckpointFlushObserver`] — checkpoint
123    /// flushes do NOT emit preemption-protection sideband notifications.
124    /// Use [`Self::with_observer`] to attach the marker producer for
125    /// `NonPreemptibleReason::CheckpointInProgress` (Phase 218.2 /
126    /// slice 96).
127    pub fn new(state: T, block_lease: BlockLease) -> Self {
128        Self {
129            state,
130            block_lease,
131            lease_id: None,
132            observer: None,
133        }
134    }
135
136    /// Phase 218.2 / slice 96 — attach a [`CheckpointFlushObserver`]
137    /// and the `lease_id` of the lease whose preemption window the
138    /// flush occupies. Calls to [`Self::checkpoint`] then notify the
139    /// observer at flush start AND on flush complete (both success
140    /// AND failure paths, via an RAII guard).
141    ///
142    /// The producer is responsible for naming the correct
143    /// `lease_id` — the lease backing the stage's durable
144    /// checkpoint storage ([`BlockLease`]). The observer is
145    /// typically the scheduler-service `PreemptionManager`'s
146    /// checkpoint adapter (see
147    /// `grafos_scheduler_service::preemption::PreemptionCheckpointObserver`).
148    pub fn with_observer(
149        state: T,
150        block_lease: BlockLease,
151        lease_id: u128,
152        observer: Box<dyn CheckpointFlushObserver + Send + Sync>,
153    ) -> Self {
154        Self {
155            state,
156            block_lease,
157            lease_id: Some(lease_id),
158            observer: Some(observer),
159        }
160    }
161
162    /// Serialize the current state to block storage.
163    ///
164    /// Uses a header block + data block layout:
165    /// - Block 0: magic (4) + version (4) + data_len (8)
166    /// - Blocks 1..N: postcard-serialized state bytes
167    ///
168    /// # Preemption protection (Phase 218.2 / slice 96)
169    ///
170    /// When this `CheckpointedStageState` was constructed via
171    /// [`Self::with_observer`], the observer's `on_flush_start` is
172    /// called at the top of this method and `on_flush_complete` is
173    /// called when the flush returns — on BOTH the success path and
174    /// every failure path. The notification is implemented via an
175    /// RAII guard so partway-through errors don't leave the lease
176    /// stuck in `CheckpointInProgress`.
177    ///
178    /// # Errors
179    ///
180    /// Returns [`EdgeError::CheckpointFailed`] on serialization or I/O failure.
181    pub fn checkpoint(&mut self) -> Result<(), EdgeError> {
182        // Phase 218.2 / slice 96 — sideband flush-protection
183        // notification. The guard's Drop ensures `on_flush_complete`
184        // runs on every exit path (Ok, Err, or panic-unwind).
185        let _flush_guard = match (self.observer.as_deref(), self.lease_id) {
186            (Some(observer), Some(lease_id)) => {
187                observer.on_flush_start(lease_id);
188                Some(FlushGuard { observer, lease_id })
189            }
190            _ => None,
191        };
192
193        let data = postcard::to_allocvec(&self.state)
194            .map_err(|e| EdgeError::CheckpointFailed(format!("serialize: {e}")))?;
195
196        let data_len = data.len() as u64;
197
198        // Write header block
199        let mut header = [0u8; BLOCK_SIZE];
200        header[0..4].copy_from_slice(&MAGIC);
201        header[4..8].copy_from_slice(&VERSION.to_le_bytes());
202        header[8..16].copy_from_slice(&data_len.to_le_bytes());
203        self.block_lease
204            .block()
205            .write_block(HEADER_BLOCK, &header)
206            .map_err(|e| EdgeError::CheckpointFailed(format!("write header: {e}")))?;
207
208        // Write data blocks
209        let num_blocks = data.len().div_ceil(BLOCK_SIZE);
210        let available = self.block_lease.block().num_blocks() as usize;
211        if num_blocks + 1 > available {
212            return Err(EdgeError::CheckpointFailed(
213                "state too large for block lease".into(),
214            ));
215        }
216
217        for i in 0..num_blocks {
218            let start = i * BLOCK_SIZE;
219            let end = core::cmp::min(start + BLOCK_SIZE, data.len());
220            let mut block = [0u8; BLOCK_SIZE];
221            block[..end - start].copy_from_slice(&data[start..end]);
222            self.block_lease
223                .block()
224                .write_block(DATA_START_BLOCK + i as u64, &block)
225                .map_err(|e| EdgeError::CheckpointFailed(format!("write data block {i}: {e}")))?;
226        }
227
228        Ok(())
229    }
230
231    /// Restore a checkpointed state from block storage.
232    ///
233    /// Reads the header block to determine data length, then reads and
234    /// deserializes the state from the data blocks.
235    ///
236    /// # Errors
237    ///
238    /// Returns [`EdgeError::CheckpointFailed`] on corrupt data, version
239    /// mismatch, or deserialization failure.
240    pub fn restore(block_lease: BlockLease) -> Result<Self, EdgeError> {
241        let header = block_lease
242            .block()
243            .read_block(HEADER_BLOCK)
244            .map_err(|e| EdgeError::CheckpointFailed(format!("read header: {e}")))?;
245
246        if header[0..4] != MAGIC {
247            return Err(EdgeError::CheckpointFailed("bad magic".into()));
248        }
249        let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
250        if version != VERSION {
251            return Err(EdgeError::CheckpointFailed(format!(
252                "unsupported version: {version}"
253            )));
254        }
255        let data_len = u64::from_le_bytes([
256            header[8], header[9], header[10], header[11], header[12], header[13], header[14],
257            header[15],
258        ]) as usize;
259
260        let num_blocks = data_len.div_ceil(BLOCK_SIZE);
261        let mut data = Vec::with_capacity(data_len);
262        for i in 0..num_blocks {
263            let block = block_lease
264                .block()
265                .read_block(DATA_START_BLOCK + i as u64)
266                .map_err(|e| EdgeError::CheckpointFailed(format!("read data block {i}: {e}")))?;
267            let remaining = data_len - data.len();
268            let take = core::cmp::min(BLOCK_SIZE, remaining);
269            data.extend_from_slice(&block[..take]);
270        }
271
272        let state: T = postcard::from_bytes(&data)
273            .map_err(|e| EdgeError::CheckpointFailed(format!("deserialize: {e}")))?;
274
275        Ok(Self {
276            state,
277            block_lease,
278            lease_id: None,
279            observer: None,
280        })
281    }
282
283    /// Borrow the current state.
284    pub fn state(&self) -> &T {
285        &self.state
286    }
287
288    /// Mutably borrow the current state.
289    pub fn state_mut(&mut self) -> &mut T {
290        &mut self.state
291    }
292
293    /// Consume the wrapper and return the underlying block lease.
294    ///
295    /// This allows transferring the lease to a subsequent
296    /// [`restore`](Self::restore) call so that both sides access the
297    /// same underlying block storage.
298    pub fn into_block_lease(self) -> BlockLease {
299        self.block_lease
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use crate::EdgeCheckpoint;
307    use grafos_std::block::BlockBuilder;
308    use grafos_std::host;
309
310    fn setup_block(num_blocks: u64) -> BlockLease {
311        host::reset_mock();
312        host::mock_set_fbbu_num_blocks(num_blocks);
313        BlockBuilder::new().acquire().expect("acquire")
314    }
315
316    #[test]
317    fn checkpoint_and_restore_roundtrip() {
318        let block_lease = setup_block(64);
319
320        let state = EdgeCheckpoint {
321            input_offset: 42,
322            output_commit: 99,
323        };
324
325        let mut ckpt = CheckpointedStageState::new(state, block_lease);
326        ckpt.checkpoint().expect("checkpoint");
327
328        // Transfer the block lease so restore reads the same blocks.
329        let lease = ckpt.into_block_lease();
330        let restored = CheckpointedStageState::<EdgeCheckpoint>::restore(lease).expect("restore");
331        assert_eq!(restored.state().input_offset, 42);
332        assert_eq!(restored.state().output_commit, 99);
333    }
334
335    #[test]
336    fn restore_fails_on_corrupt_data() {
337        let block_lease = setup_block(64);
338        // Write garbage to block 0
339        let mut garbage = [0u8; BLOCK_SIZE];
340        garbage[0..4].copy_from_slice(b"BAAD");
341        block_lease.block().write_block(0, &garbage).expect("write");
342
343        let result = CheckpointedStageState::<EdgeCheckpoint>::restore(block_lease);
344        match result {
345            Err(EdgeError::CheckpointFailed(msg)) => assert_eq!(msg, "bad magic"),
346            Err(other) => panic!("unexpected error: {other}"),
347            Ok(_) => panic!("expected error but got Ok"),
348        }
349    }
350
351    #[test]
352    fn state_mut_allows_modification() {
353        let block_lease = setup_block(64);
354        let state = EdgeCheckpoint {
355            input_offset: 0,
356            output_commit: 0,
357        };
358
359        let mut ckpt = CheckpointedStageState::new(state, block_lease);
360        ckpt.state_mut().input_offset = 100;
361        ckpt.state_mut().output_commit = 200;
362
363        assert_eq!(ckpt.state().input_offset, 100);
364        assert_eq!(ckpt.state().output_commit, 200);
365    }
366
367    #[test]
368    fn checkpoint_preserves_complex_state() {
369        let block_lease = setup_block(64);
370
371        #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
372        struct StageState {
373            name: alloc::string::String,
374            items_processed: u64,
375            checkpoint: EdgeCheckpoint,
376        }
377
378        let state = StageState {
379            name: "transform-stage".into(),
380            items_processed: 1024,
381            checkpoint: EdgeCheckpoint {
382                input_offset: 500,
383                output_commit: 480,
384            },
385        };
386
387        let mut ckpt = CheckpointedStageState::new(state.clone(), block_lease);
388        ckpt.checkpoint().expect("checkpoint");
389
390        // Transfer the block lease so restore reads the same blocks.
391        let lease = ckpt.into_block_lease();
392        let restored = CheckpointedStageState::<StageState>::restore(lease).expect("restore");
393        assert_eq!(restored.state(), &state);
394    }
395
396    // -----------------------------------------------------------------
397    // Phase 218.2 / slice 96 — CheckpointFlushObserver pin tests
398    // -----------------------------------------------------------------
399    //
400    // The slice-96 contract: when a `CheckpointedStageState` is
401    // constructed via `with_observer`, the observer's
402    // `on_flush_start(lease_id)` runs at the top of `checkpoint()` and
403    // `on_flush_complete(lease_id)` runs at the end — on BOTH the
404    // success path and every failure path. Three pin tests cover:
405    //   - mark on flush start
406    //   - clear on success
407    //   - clear on failure (the most load-bearing — without RAII
408    //     discipline, a partway-through flush would leave the lease
409    //     stuck in `CheckpointInProgress` forever)
410    //
411    // The observer is a sideband signal to the scheduler; it does NOT
412    // change the flush's success semantics. A failed observer
413    // notification (lease not tracked, etc.) is the implementation's
414    // problem to handle internally.
415
416    use core::sync::atomic::{AtomicU64, Ordering};
417
418    /// Test helper. Records the order and lease_id of every
419    /// `on_flush_start` / `on_flush_complete` call so tests can pin
420    /// the lifecycle precisely.
421    #[derive(Default)]
422    struct RecordingObserver {
423        start_count: AtomicU64,
424        complete_count: AtomicU64,
425        last_start_lease: AtomicU64,
426        last_complete_lease: AtomicU64,
427    }
428
429    impl CheckpointFlushObserver for alloc::sync::Arc<RecordingObserver> {
430        fn on_flush_start(&self, lease_id: u128) {
431            self.start_count.fetch_add(1, Ordering::SeqCst);
432            // Test leases all fit in u64; truncating is safe for the
433            // recording assertion.
434            self.last_start_lease
435                .store(lease_id as u64, Ordering::SeqCst);
436        }
437
438        fn on_flush_complete(&self, lease_id: u128) {
439            self.complete_count.fetch_add(1, Ordering::SeqCst);
440            self.last_complete_lease
441                .store(lease_id as u64, Ordering::SeqCst);
442        }
443    }
444
445    #[test]
446    fn checkpoint_flush_marks_lease_non_preemptible() {
447        // Verify `on_flush_start` is called with the correct lease_id
448        // at the BEGINNING of the flush. Pinned together with the
449        // success-path clear test below — the start count is 1 and
450        // the recorded lease_id matches the one the producer wired.
451        let block_lease = setup_block(64);
452        let observer = alloc::sync::Arc::new(RecordingObserver::default());
453        let lease_id: u128 = 0xdead_beef_cafe_babe;
454
455        let state = EdgeCheckpoint {
456            input_offset: 7,
457            output_commit: 7,
458        };
459
460        let mut ckpt = CheckpointedStageState::with_observer(
461            state,
462            block_lease,
463            lease_id,
464            alloc::boxed::Box::new(observer.clone()),
465        );
466        ckpt.checkpoint().expect("checkpoint");
467
468        assert_eq!(
469            observer.start_count.load(Ordering::SeqCst),
470            1,
471            "on_flush_start must be called exactly once at flush start"
472        );
473        assert_eq!(
474            observer.last_start_lease.load(Ordering::SeqCst),
475            lease_id as u64,
476            "on_flush_start must receive the producer-named lease_id"
477        );
478    }
479
480    #[test]
481    fn checkpoint_flush_clears_marker_on_success() {
482        // Verify `on_flush_complete` is called once with the same
483        // lease_id at the END of a successful flush. The full
484        // lifecycle: start once, complete once, both keyed on the
485        // producer-named lease_id.
486        let block_lease = setup_block(64);
487        let observer = alloc::sync::Arc::new(RecordingObserver::default());
488        let lease_id: u128 = 0x1234_5678_9abc_def0;
489
490        let state = EdgeCheckpoint {
491            input_offset: 11,
492            output_commit: 22,
493        };
494
495        let mut ckpt = CheckpointedStageState::with_observer(
496            state,
497            block_lease,
498            lease_id,
499            alloc::boxed::Box::new(observer.clone()),
500        );
501        ckpt.checkpoint().expect("checkpoint");
502
503        assert_eq!(observer.start_count.load(Ordering::SeqCst), 1);
504        assert_eq!(
505            observer.complete_count.load(Ordering::SeqCst),
506            1,
507            "on_flush_complete must be called exactly once on success"
508        );
509        assert_eq!(
510            observer.last_complete_lease.load(Ordering::SeqCst),
511            lease_id as u64,
512            "on_flush_complete must receive the producer-named lease_id"
513        );
514    }
515
516    #[test]
517    fn checkpoint_flush_clears_marker_on_failure() {
518        // The load-bearing test. Trigger a failure partway through
519        // the flush by giving a block lease too small to hold the
520        // serialized data — the "state too large for block lease"
521        // path. Without the RAII guard discipline, a partway-through
522        // failure would leave the lease stuck in
523        // `CheckpointInProgress` forever; this test pins that
524        // `on_flush_complete` runs even when the flush returns an
525        // error.
526        //
527        // Setup: 1 block available (header only) so any non-empty
528        // serialized state triggers the size check failure.
529        let block_lease = setup_block(1);
530        let observer = alloc::sync::Arc::new(RecordingObserver::default());
531        let lease_id: u128 = 0xabcd_ef01_2345_6789;
532
533        // A state that won't fit in 0 data blocks (postcard for
534        // EdgeCheckpoint serializes to several bytes — needs at
535        // least 1 data block, but we gave only 1 total block).
536        let state = EdgeCheckpoint {
537            input_offset: 99,
538            output_commit: 100,
539        };
540
541        let mut ckpt = CheckpointedStageState::with_observer(
542            state,
543            block_lease,
544            lease_id,
545            alloc::boxed::Box::new(observer.clone()),
546        );
547        let result = ckpt.checkpoint();
548
549        // The flush failed — the size check should have rejected it.
550        assert!(
551            matches!(result, Err(EdgeError::CheckpointFailed(_))),
552            "expected CheckpointFailed, got {result:?}"
553        );
554
555        // The marker MUST still have been cleared, even though the
556        // flush failed. RAII via FlushGuard's Drop is what makes
557        // this work.
558        assert_eq!(
559            observer.start_count.load(Ordering::SeqCst),
560            1,
561            "on_flush_start runs before the size check"
562        );
563        assert_eq!(
564            observer.complete_count.load(Ordering::SeqCst),
565            1,
566            "on_flush_complete MUST run on the failure path \
567             (RAII guard via Drop) so the lease isn't stuck \
568             non-preemptible after a failed flush"
569        );
570        assert_eq!(
571            observer.last_complete_lease.load(Ordering::SeqCst),
572            lease_id as u64,
573            "on_flush_complete must clear the producer-named lease_id"
574        );
575    }
576
577    #[test]
578    fn checkpoint_without_observer_remains_silent() {
579        // Back-compat pin: the original `new()` constructor produces
580        // a `CheckpointedStageState` with no observer. Flushes do
581        // NOT panic, do NOT call into a missing observer, and do NOT
582        // change behavior relative to pre-slice-96 behavior.
583        let block_lease = setup_block(64);
584        let state = EdgeCheckpoint {
585            input_offset: 5,
586            output_commit: 5,
587        };
588
589        // No observer attached.
590        let mut ckpt = CheckpointedStageState::new(state, block_lease);
591        ckpt.checkpoint().expect("checkpoint");
592        // If we got here, the no-observer path didn't dereference a
593        // null observer or otherwise misbehave. Nothing else to
594        // assert — the absence of a panic or missed-method dispatch
595        // is the pin.
596    }
597}