grafos_collections/
durable.rs

1//! Checkpoint wrapper that persists a collection to block storage.
2//!
3//! [`Durable<T>`] wraps any `Serialize + DeserializeOwned` value (typically
4//! a collection's state snapshot) and provides [`checkpoint()`](Durable::checkpoint)
5//! and [`restore()`](Durable::restore) to write and read the inner value
6//! from a [`BlockLease`].
7//!
8//! # Checkpoint format
9//!
10//! ```text
11//! Block 0:     [header]  magic: "DCHK" (4) | version: u32 (4) | data_len: u64 (8) | padding (496)
12//! Block 1..N:  [data]    postcard-serialized value, zero-padded to 512-byte boundaries
13//! ```
14//!
15//! # Example
16//!
17//! ```rust,no_run
18//! use grafos_collections::durable::Durable;
19//! use grafos_std::block::BlockBuilder;
20//! use serde::{Serialize, Deserialize};
21//!
22//! # grafos_std::host::reset_mock();
23//! # grafos_std::host::mock_set_fbbu_num_blocks(128);
24//! #[derive(Serialize, Deserialize, Debug, PartialEq)]
25//! struct State { counter: u64 }
26//!
27//! let lease = BlockBuilder::new().acquire()?;
28//! let durable = Durable::new(State { counter: 42 }, lease);
29//! durable.checkpoint()?;
30//!
31//! let lease = durable.into_block_lease();
32//! let restored: Durable<State> = Durable::restore(lease)?;
33//! assert_eq!(restored.inner().counter, 42);
34//! # Ok::<(), grafos_std::FabricError>(())
35//! ```
36
37extern crate alloc;
38use alloc::vec::Vec;
39
40use grafos_std::block::{BlockLease, BLOCK_SIZE};
41use grafos_std::error::{FabricError, Result};
42
43use serde::{de::DeserializeOwned, Serialize};
44
45const MAGIC: [u8; 4] = *b"DCHK";
46const VERSION: u32 = 1;
47const HEADER_BLOCK: u64 = 0;
48const DATA_START_BLOCK: u64 = 1;
49
50/// A wrapper that checkpoints its inner value to block storage.
51///
52/// `Durable<T>` owns both the inner value and a [`BlockLease`] used for
53/// persistence. Calling [`checkpoint()`](Durable::checkpoint) serializes
54/// the inner value and writes it to the block lease.
55/// [`restore()`](Durable::restore) reads a checkpoint and reconstructs
56/// the value.
57///
58/// Implements `Deref<Target = T>` and `DerefMut`, so you can access the
59/// inner value's fields and methods directly through the wrapper.
60///
61/// # Example
62///
63/// ```rust
64/// use grafos_collections::durable::Durable;
65/// use grafos_std::block::BlockBuilder;
66/// use serde::{Serialize, Deserialize};
67///
68/// # grafos_std::host::reset_mock();
69/// # grafos_std::host::mock_set_fbbu_num_blocks(128);
70/// #[derive(Serialize, Deserialize)]
71/// struct Data { values: Vec<u32> }
72///
73/// let lease = BlockBuilder::new().acquire()?;
74/// let mut d = Durable::new(Data { values: vec![1, 2, 3] }, lease);
75///
76/// // Deref access
77/// assert_eq!(d.values.len(), 3);
78///
79/// // Mutate and checkpoint
80/// d.inner_mut().values.push(4);
81/// d.checkpoint()?;
82/// # Ok::<(), grafos_std::FabricError>(())
83/// ```
84pub struct Durable<T> {
85    inner: T,
86    block_lease: BlockLease,
87    auto_checkpoint_batch: Option<u64>,
88    mutation_count: u64,
89}
90
91impl<T: Serialize + DeserializeOwned> Durable<T> {
92    /// Wrap a value with a block lease for checkpointing.
93    ///
94    /// Does not write anything to the block lease. Call
95    /// [`checkpoint()`](Durable::checkpoint) to persist the value.
96    pub fn new(inner: T, block_lease: BlockLease) -> Self {
97        Durable {
98            inner,
99            block_lease,
100            auto_checkpoint_batch: None,
101            mutation_count: 0,
102        }
103    }
104
105    /// Returns a reference to the inner value.
106    pub fn inner(&self) -> &T {
107        &self.inner
108    }
109
110    /// Returns a mutable reference to the inner value.
111    ///
112    /// This does **not** increment the mutation counter. Use
113    /// [`mutate()`](Durable::mutate) if you want auto-checkpoint
114    /// tracking.
115    pub fn inner_mut(&mut self) -> &mut T {
116        &mut self.inner
117    }
118
119    /// Enable auto-checkpoint: after every `batch_size` calls to
120    /// [`mutate()`](Durable::mutate), the state is automatically
121    /// checkpointed to block storage.
122    ///
123    /// Pass `0` or call with `None` to disable auto-checkpoint.
124    pub fn set_auto_checkpoint(&mut self, batch_size: Option<u64>) {
125        self.auto_checkpoint_batch = batch_size.filter(|&n| n > 0);
126        self.mutation_count = 0;
127    }
128
129    /// Apply a mutation to the inner value and optionally auto-checkpoint.
130    ///
131    /// The closure `f` receives `&mut T` and can modify it. After the
132    /// closure returns, the mutation counter is incremented. If
133    /// auto-checkpoint is enabled and the counter reaches the batch
134    /// size, [`checkpoint()`](Durable::checkpoint) is called
135    /// automatically and the counter resets to zero.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error only if auto-checkpoint triggers and the
140    /// checkpoint itself fails.
141    pub fn mutate<F>(&mut self, f: F) -> Result<()>
142    where
143        F: FnOnce(&mut T),
144    {
145        f(&mut self.inner);
146        self.mutation_count += 1;
147        if let Some(batch) = self.auto_checkpoint_batch {
148            if self.mutation_count >= batch {
149                self.checkpoint()?;
150                self.mutation_count = 0;
151            }
152        }
153        Ok(())
154    }
155
156    /// Returns the number of mutations since the last checkpoint (or
157    /// since auto-checkpoint was enabled).
158    pub fn mutation_count(&self) -> u64 {
159        self.mutation_count
160    }
161
162    /// Returns the lease ID of the block lease for external renewal
163    /// management (e.g. via [`grafos_leasekit::RenewalManager`]).
164    pub fn lease_id(&self) -> u128 {
165        self.block_lease.lease_id()
166    }
167
168    /// Returns the expiry time (unix seconds) of the block lease for
169    /// external renewal management.
170    pub fn expires_at_unix_secs(&self) -> u64 {
171        self.block_lease.expires_at_unix_secs()
172    }
173
174    /// Consume the `Durable` and return the block lease.
175    ///
176    /// Useful for passing the same lease to [`restore()`](Durable::restore)
177    /// after a checkpoint, simulating a process restart that re-opens the
178    /// same block device.
179    pub fn into_block_lease(self) -> BlockLease {
180        self.block_lease
181    }
182
183    /// Serialize the inner value and write it to block storage.
184    ///
185    /// Writes a header to block 0 (magic, version, data length) followed
186    /// by the postcard-serialized data across blocks 1..N.
187    ///
188    /// # Errors
189    ///
190    /// - [`FabricError::CapacityExceeded`] if the serialized data requires
191    ///   more blocks than available on the device.
192    /// - [`FabricError::IoError`] if serialization or block write fails.
193    pub fn checkpoint(&self) -> Result<()> {
194        let data = postcard::to_allocvec(&self.inner).map_err(|_| FabricError::IoError(-1))?;
195        let data_len = data.len() as u64;
196
197        // Write header block
198        let mut header = [0u8; BLOCK_SIZE];
199        header[0..4].copy_from_slice(&MAGIC);
200        header[4..8].copy_from_slice(&VERSION.to_le_bytes());
201        header[8..16].copy_from_slice(&data_len.to_le_bytes());
202        self.block_lease
203            .block()
204            .write_block(HEADER_BLOCK, &header)?;
205
206        // Write data blocks
207        let num_blocks = data.len().div_ceil(BLOCK_SIZE);
208        let available = self.block_lease.block().num_blocks() as usize;
209        if num_blocks + 1 > available {
210            return Err(FabricError::CapacityExceeded);
211        }
212
213        for i in 0..num_blocks {
214            let start = i * BLOCK_SIZE;
215            let end = core::cmp::min(start + BLOCK_SIZE, data.len());
216            let mut block = [0u8; BLOCK_SIZE];
217            block[..end - start].copy_from_slice(&data[start..end]);
218            self.block_lease
219                .block()
220                .write_block(DATA_START_BLOCK + i as u64, &block)?;
221        }
222
223        Ok(())
224    }
225
226    /// Read a checkpoint from block storage and reconstruct the value.
227    ///
228    /// Reads block 0, validates the magic (`"DCHK"`) and version, then
229    /// reads the data blocks and deserializes the value.
230    ///
231    /// # Errors
232    ///
233    /// - [`FabricError::IoError(-2)`](FabricError::IoError) if the magic
234    ///   bytes do not match.
235    /// - [`FabricError::IoError(-3)`](FabricError::IoError) if the version
236    ///   is unsupported.
237    /// - [`FabricError::IoError(-1)`](FabricError::IoError) if
238    ///   deserialization fails.
239    pub fn restore(block_lease: BlockLease) -> Result<Durable<T>> {
240        let header = block_lease.block().read_block(HEADER_BLOCK)?;
241
242        // Verify magic
243        if header[0..4] != MAGIC {
244            return Err(FabricError::IoError(-2));
245        }
246        let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
247        if version != VERSION {
248            return Err(FabricError::IoError(-3));
249        }
250        let data_len = u64::from_le_bytes([
251            header[8], header[9], header[10], header[11], header[12], header[13], header[14],
252            header[15],
253        ]) as usize;
254
255        // Read data blocks
256        let num_blocks = data_len.div_ceil(BLOCK_SIZE);
257        let mut data = Vec::with_capacity(data_len);
258        for i in 0..num_blocks {
259            let block = block_lease
260                .block()
261                .read_block(DATA_START_BLOCK + i as u64)?;
262            let start = data.len();
263            let remaining = data_len - start;
264            let take = core::cmp::min(BLOCK_SIZE, remaining);
265            data.extend_from_slice(&block[..take]);
266        }
267
268        let inner: T = postcard::from_bytes(&data).map_err(|_| FabricError::IoError(-1))?;
269        Ok(Durable {
270            inner,
271            block_lease,
272            auto_checkpoint_batch: None,
273            mutation_count: 0,
274        })
275    }
276}
277
278impl<T> core::ops::Deref for Durable<T> {
279    type Target = T;
280
281    fn deref(&self) -> &T {
282        &self.inner
283    }
284}
285
286impl<T> core::ops::DerefMut for Durable<T> {
287    fn deref_mut(&mut self) -> &mut T {
288        &mut self.inner
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use grafos_std::block::BlockBuilder;
296    use grafos_std::host;
297    use serde::{Deserialize, Serialize};
298
299    fn setup_block(num_blocks: u64) -> BlockLease {
300        host::reset_mock();
301        host::mock_set_fbbu_num_blocks(num_blocks);
302        BlockBuilder::new().acquire().expect("acquire")
303    }
304
305    #[derive(Debug, PartialEq, Serialize, Deserialize)]
306    struct Snapshot {
307        items: Vec<u32>,
308        count: u64,
309    }
310
311    #[test]
312    fn checkpoint_restore_roundtrip() {
313        let block_lease = setup_block(64);
314        let snap = Snapshot {
315            items: vec![1, 2, 3, 4, 5],
316            count: 5,
317        };
318
319        let durable = Durable::new(snap, block_lease);
320        durable.checkpoint().expect("checkpoint");
321
322        // Restore from same block lease (simulating restart with same device)
323        let block_lease = durable.into_block_lease();
324        let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
325
326        assert_eq!(restored.inner().items, vec![1, 2, 3, 4, 5]);
327        assert_eq!(restored.inner().count, 5);
328    }
329
330    #[test]
331    fn deref_provides_transparent_access() {
332        let block_lease = setup_block(64);
333        let snap = Snapshot {
334            items: vec![10, 20],
335            count: 2,
336        };
337        let durable = Durable::new(snap, block_lease);
338
339        // Deref: access inner fields directly
340        assert_eq!(durable.count, 2);
341        assert_eq!(durable.items.len(), 2);
342    }
343
344    #[test]
345    fn checkpoint_large_data() {
346        let block_lease = setup_block(256);
347        let items: Vec<u32> = (0..1000).collect();
348        let snap = Snapshot { items, count: 1000 };
349
350        let durable = Durable::new(snap, block_lease);
351        durable.checkpoint().expect("checkpoint");
352
353        let block_lease = durable.into_block_lease();
354        let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
355        assert_eq!(restored.inner().count, 1000);
356        assert_eq!(restored.inner().items.len(), 1000);
357        assert_eq!(restored.inner().items[999], 999);
358    }
359
360    #[test]
361    fn restore_bad_magic_fails() {
362        let block_lease = setup_block(64);
363        // Write garbage to block 0
364        let mut garbage = [0u8; BLOCK_SIZE];
365        garbage[0..4].copy_from_slice(b"BAAD");
366        block_lease.block().write_block(0, &garbage).expect("write");
367
368        let result: Result<Durable<Snapshot>> = Durable::restore(block_lease);
369        assert!(result.is_err());
370    }
371
372    #[test]
373    fn inner_mut_allows_modification() {
374        let block_lease = setup_block(64);
375        let snap = Snapshot {
376            items: vec![1],
377            count: 1,
378        };
379        let mut durable = Durable::new(snap, block_lease);
380
381        durable.inner_mut().items.push(2);
382        durable.inner_mut().count = 2;
383        durable.checkpoint().expect("checkpoint");
384
385        let block_lease = durable.into_block_lease();
386        let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
387        assert_eq!(restored.inner().items, vec![1, 2]);
388        assert_eq!(restored.inner().count, 2);
389    }
390
391    #[test]
392    fn auto_checkpoint_after_n_mutations() {
393        let block_lease = setup_block(64);
394        let snap = Snapshot {
395            items: vec![],
396            count: 0,
397        };
398        let mut durable = Durable::new(snap, block_lease);
399        durable.set_auto_checkpoint(Some(3));
400
401        // First two mutations: no checkpoint yet
402        durable
403            .mutate(|s| {
404                s.items.push(1);
405                s.count = 1;
406            })
407            .expect("mutate 1");
408        assert_eq!(durable.mutation_count(), 1);
409
410        durable
411            .mutate(|s| {
412                s.items.push(2);
413                s.count = 2;
414            })
415            .expect("mutate 2");
416        assert_eq!(durable.mutation_count(), 2);
417
418        // Third mutation triggers auto-checkpoint, counter resets
419        durable
420            .mutate(|s| {
421                s.items.push(3);
422                s.count = 3;
423            })
424            .expect("mutate 3");
425        assert_eq!(durable.mutation_count(), 0);
426
427        // Verify the checkpoint was written by restoring
428        let block_lease = durable.into_block_lease();
429        let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
430        assert_eq!(restored.inner().items, vec![1, 2, 3]);
431        assert_eq!(restored.inner().count, 3);
432    }
433
434    #[test]
435    fn auto_checkpoint_disabled_by_default() {
436        let block_lease = setup_block(64);
437        let snap = Snapshot {
438            items: vec![],
439            count: 0,
440        };
441        let mut durable = Durable::new(snap, block_lease);
442
443        // No auto-checkpoint set — mutations just count
444        for i in 0..10u32 {
445            durable
446                .mutate(|s| {
447                    s.items.push(i);
448                    s.count += 1;
449                })
450                .expect("mutate");
451        }
452        assert_eq!(durable.mutation_count(), 10);
453
454        // No checkpoint was written, so restore should fail (no magic)
455        let block_lease = durable.into_block_lease();
456        let result: Result<Durable<Snapshot>> = Durable::restore(block_lease);
457        assert!(result.is_err());
458    }
459
460    #[test]
461    fn set_auto_checkpoint_resets_counter() {
462        let block_lease = setup_block(64);
463        let snap = Snapshot {
464            items: vec![],
465            count: 0,
466        };
467        let mut durable = Durable::new(snap, block_lease);
468
469        durable.set_auto_checkpoint(Some(5));
470        durable.mutate(|s| s.count += 1).expect("mutate");
471        durable.mutate(|s| s.count += 1).expect("mutate");
472        assert_eq!(durable.mutation_count(), 2);
473
474        // Changing batch size resets counter
475        durable.set_auto_checkpoint(Some(2));
476        assert_eq!(durable.mutation_count(), 0);
477    }
478}