1extern crate alloc;
38use alloc::vec::Vec;
39
40use grafos_std::block::{BlockLease, BLOCK_SIZE};
41use grafos_std::error::{FabricError, Result};
42
43use serde::{de::DeserializeOwned, Serialize};
44
45const MAGIC: [u8; 4] = *b"DCHK";
46const VERSION: u32 = 1;
47const HEADER_BLOCK: u64 = 0;
48const DATA_START_BLOCK: u64 = 1;
49
50pub struct Durable<T> {
85 inner: T,
86 block_lease: BlockLease,
87 auto_checkpoint_batch: Option<u64>,
88 mutation_count: u64,
89}
90
91impl<T: Serialize + DeserializeOwned> Durable<T> {
92 pub fn new(inner: T, block_lease: BlockLease) -> Self {
97 Durable {
98 inner,
99 block_lease,
100 auto_checkpoint_batch: None,
101 mutation_count: 0,
102 }
103 }
104
105 pub fn inner(&self) -> &T {
107 &self.inner
108 }
109
110 pub fn inner_mut(&mut self) -> &mut T {
116 &mut self.inner
117 }
118
119 pub fn set_auto_checkpoint(&mut self, batch_size: Option<u64>) {
125 self.auto_checkpoint_batch = batch_size.filter(|&n| n > 0);
126 self.mutation_count = 0;
127 }
128
129 pub fn mutate<F>(&mut self, f: F) -> Result<()>
142 where
143 F: FnOnce(&mut T),
144 {
145 f(&mut self.inner);
146 self.mutation_count += 1;
147 if let Some(batch) = self.auto_checkpoint_batch {
148 if self.mutation_count >= batch {
149 self.checkpoint()?;
150 self.mutation_count = 0;
151 }
152 }
153 Ok(())
154 }
155
156 pub fn mutation_count(&self) -> u64 {
159 self.mutation_count
160 }
161
162 pub fn lease_id(&self) -> u128 {
165 self.block_lease.lease_id()
166 }
167
168 pub fn expires_at_unix_secs(&self) -> u64 {
171 self.block_lease.expires_at_unix_secs()
172 }
173
174 pub fn into_block_lease(self) -> BlockLease {
180 self.block_lease
181 }
182
183 pub fn checkpoint(&self) -> Result<()> {
194 let data = postcard::to_allocvec(&self.inner).map_err(|_| FabricError::IoError(-1))?;
195 let data_len = data.len() as u64;
196
197 let mut header = [0u8; BLOCK_SIZE];
199 header[0..4].copy_from_slice(&MAGIC);
200 header[4..8].copy_from_slice(&VERSION.to_le_bytes());
201 header[8..16].copy_from_slice(&data_len.to_le_bytes());
202 self.block_lease
203 .block()
204 .write_block(HEADER_BLOCK, &header)?;
205
206 let num_blocks = data.len().div_ceil(BLOCK_SIZE);
208 let available = self.block_lease.block().num_blocks() as usize;
209 if num_blocks + 1 > available {
210 return Err(FabricError::CapacityExceeded);
211 }
212
213 for i in 0..num_blocks {
214 let start = i * BLOCK_SIZE;
215 let end = core::cmp::min(start + BLOCK_SIZE, data.len());
216 let mut block = [0u8; BLOCK_SIZE];
217 block[..end - start].copy_from_slice(&data[start..end]);
218 self.block_lease
219 .block()
220 .write_block(DATA_START_BLOCK + i as u64, &block)?;
221 }
222
223 Ok(())
224 }
225
226 pub fn restore(block_lease: BlockLease) -> Result<Durable<T>> {
240 let header = block_lease.block().read_block(HEADER_BLOCK)?;
241
242 if header[0..4] != MAGIC {
244 return Err(FabricError::IoError(-2));
245 }
246 let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
247 if version != VERSION {
248 return Err(FabricError::IoError(-3));
249 }
250 let data_len = u64::from_le_bytes([
251 header[8], header[9], header[10], header[11], header[12], header[13], header[14],
252 header[15],
253 ]) as usize;
254
255 let num_blocks = data_len.div_ceil(BLOCK_SIZE);
257 let mut data = Vec::with_capacity(data_len);
258 for i in 0..num_blocks {
259 let block = block_lease
260 .block()
261 .read_block(DATA_START_BLOCK + i as u64)?;
262 let start = data.len();
263 let remaining = data_len - start;
264 let take = core::cmp::min(BLOCK_SIZE, remaining);
265 data.extend_from_slice(&block[..take]);
266 }
267
268 let inner: T = postcard::from_bytes(&data).map_err(|_| FabricError::IoError(-1))?;
269 Ok(Durable {
270 inner,
271 block_lease,
272 auto_checkpoint_batch: None,
273 mutation_count: 0,
274 })
275 }
276}
277
278impl<T> core::ops::Deref for Durable<T> {
279 type Target = T;
280
281 fn deref(&self) -> &T {
282 &self.inner
283 }
284}
285
286impl<T> core::ops::DerefMut for Durable<T> {
287 fn deref_mut(&mut self) -> &mut T {
288 &mut self.inner
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use grafos_std::block::BlockBuilder;
296 use grafos_std::host;
297 use serde::{Deserialize, Serialize};
298
299 fn setup_block(num_blocks: u64) -> BlockLease {
300 host::reset_mock();
301 host::mock_set_fbbu_num_blocks(num_blocks);
302 BlockBuilder::new().acquire().expect("acquire")
303 }
304
305 #[derive(Debug, PartialEq, Serialize, Deserialize)]
306 struct Snapshot {
307 items: Vec<u32>,
308 count: u64,
309 }
310
311 #[test]
312 fn checkpoint_restore_roundtrip() {
313 let block_lease = setup_block(64);
314 let snap = Snapshot {
315 items: vec![1, 2, 3, 4, 5],
316 count: 5,
317 };
318
319 let durable = Durable::new(snap, block_lease);
320 durable.checkpoint().expect("checkpoint");
321
322 let block_lease = durable.into_block_lease();
324 let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
325
326 assert_eq!(restored.inner().items, vec![1, 2, 3, 4, 5]);
327 assert_eq!(restored.inner().count, 5);
328 }
329
330 #[test]
331 fn deref_provides_transparent_access() {
332 let block_lease = setup_block(64);
333 let snap = Snapshot {
334 items: vec![10, 20],
335 count: 2,
336 };
337 let durable = Durable::new(snap, block_lease);
338
339 assert_eq!(durable.count, 2);
341 assert_eq!(durable.items.len(), 2);
342 }
343
344 #[test]
345 fn checkpoint_large_data() {
346 let block_lease = setup_block(256);
347 let items: Vec<u32> = (0..1000).collect();
348 let snap = Snapshot { items, count: 1000 };
349
350 let durable = Durable::new(snap, block_lease);
351 durable.checkpoint().expect("checkpoint");
352
353 let block_lease = durable.into_block_lease();
354 let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
355 assert_eq!(restored.inner().count, 1000);
356 assert_eq!(restored.inner().items.len(), 1000);
357 assert_eq!(restored.inner().items[999], 999);
358 }
359
360 #[test]
361 fn restore_bad_magic_fails() {
362 let block_lease = setup_block(64);
363 let mut garbage = [0u8; BLOCK_SIZE];
365 garbage[0..4].copy_from_slice(b"BAAD");
366 block_lease.block().write_block(0, &garbage).expect("write");
367
368 let result: Result<Durable<Snapshot>> = Durable::restore(block_lease);
369 assert!(result.is_err());
370 }
371
372 #[test]
373 fn inner_mut_allows_modification() {
374 let block_lease = setup_block(64);
375 let snap = Snapshot {
376 items: vec![1],
377 count: 1,
378 };
379 let mut durable = Durable::new(snap, block_lease);
380
381 durable.inner_mut().items.push(2);
382 durable.inner_mut().count = 2;
383 durable.checkpoint().expect("checkpoint");
384
385 let block_lease = durable.into_block_lease();
386 let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
387 assert_eq!(restored.inner().items, vec![1, 2]);
388 assert_eq!(restored.inner().count, 2);
389 }
390
391 #[test]
392 fn auto_checkpoint_after_n_mutations() {
393 let block_lease = setup_block(64);
394 let snap = Snapshot {
395 items: vec![],
396 count: 0,
397 };
398 let mut durable = Durable::new(snap, block_lease);
399 durable.set_auto_checkpoint(Some(3));
400
401 durable
403 .mutate(|s| {
404 s.items.push(1);
405 s.count = 1;
406 })
407 .expect("mutate 1");
408 assert_eq!(durable.mutation_count(), 1);
409
410 durable
411 .mutate(|s| {
412 s.items.push(2);
413 s.count = 2;
414 })
415 .expect("mutate 2");
416 assert_eq!(durable.mutation_count(), 2);
417
418 durable
420 .mutate(|s| {
421 s.items.push(3);
422 s.count = 3;
423 })
424 .expect("mutate 3");
425 assert_eq!(durable.mutation_count(), 0);
426
427 let block_lease = durable.into_block_lease();
429 let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
430 assert_eq!(restored.inner().items, vec![1, 2, 3]);
431 assert_eq!(restored.inner().count, 3);
432 }
433
434 #[test]
435 fn auto_checkpoint_disabled_by_default() {
436 let block_lease = setup_block(64);
437 let snap = Snapshot {
438 items: vec![],
439 count: 0,
440 };
441 let mut durable = Durable::new(snap, block_lease);
442
443 for i in 0..10u32 {
445 durable
446 .mutate(|s| {
447 s.items.push(i);
448 s.count += 1;
449 })
450 .expect("mutate");
451 }
452 assert_eq!(durable.mutation_count(), 10);
453
454 let block_lease = durable.into_block_lease();
456 let result: Result<Durable<Snapshot>> = Durable::restore(block_lease);
457 assert!(result.is_err());
458 }
459
460 #[test]
461 fn set_auto_checkpoint_resets_counter() {
462 let block_lease = setup_block(64);
463 let snap = Snapshot {
464 items: vec![],
465 count: 0,
466 };
467 let mut durable = Durable::new(snap, block_lease);
468
469 durable.set_auto_checkpoint(Some(5));
470 durable.mutate(|s| s.count += 1).expect("mutate");
471 durable.mutate(|s| s.count += 1).expect("mutate");
472 assert_eq!(durable.mutation_count(), 2);
473
474 durable.set_auto_checkpoint(Some(2));
476 assert_eq!(durable.mutation_count(), 0);
477 }
478}