grafos_kv/
lib.rs

1//! Key-value store over leased fabric resources.
2//!
3//! [`FabricKvStore`] provides a tiered key-value store where the hot tier
4//! lives in leased DRAM (via [`FabricHashMap`]) and the cold tier (behind
5//! the `persistence` feature) spills to leased block storage.
6//!
7//! Each key carries a TTL. On access, the TTL is refreshed. The
8//! [`tick()`](FabricKvStore::tick) method evicts expired entries. When all
9//! keys in the hot tier expire, the backing shard lease can be left to
10//! expire naturally, freeing the remote memory.
11//!
12//! # Quick start
13//!
14//! ```rust
15//! use grafos_kv::{KvBuilder, FabricKvStore};
16//!
17//! # grafos_std::host::reset_mock();
18//! # grafos_std::host::mock_set_fbmu_arena_size(65536);
19//! let mut kv: FabricKvStore = KvBuilder::new()
20//!     .hot_buckets(64)
21//!     .default_ttl_secs(300)
22//!     .build()?;
23//!
24//! kv.put(b"hello", b"world")?;
25//! assert_eq!(kv.get(b"hello")?, Some(b"world".to_vec()));
26//! # Ok::<(), grafos_std::FabricError>(())
27//! ```
28//!
29//! # Feature flags
30//!
31//! | Feature | Default | Effect |
32//! |---------|---------|--------|
33//! | `std` | Yes | Enables `std` in grafos-std |
34//! | `persistence` | No | Enables cold tier (block storage spill/promote) |
35
36#![cfg_attr(not(feature = "std"), no_std)]
37
38extern crate alloc;
39
40use alloc::vec::Vec;
41
42use grafos_collections::map::FabricHashMap;
43use grafos_std::error::{FabricError, Result};
44use grafos_std::host;
45
46use serde::{Deserialize, Serialize};
47
48#[cfg(feature = "persistence")]
49use grafos_std::block::{BlockBuilder, BlockLease, BLOCK_SIZE};
50
51/// Maximum serialized size for keys in the hot-tier FabricHashMap.
52const KEY_STRIDE: usize = 128;
53/// Maximum serialized size for values (KvEntry) in the hot-tier FabricHashMap.
54const VALUE_STRIDE: usize = 512;
55
56/// Internal entry stored in the hot tier alongside the raw value bytes.
57#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
58struct KvEntry {
59    value: Vec<u8>,
60    created_at: u64,
61    ttl_secs: u32,
62    access_count: u64,
63}
64
65/// Builder for [`FabricKvStore`].
66///
67/// # Example
68///
69/// ```rust
70/// use grafos_kv::KvBuilder;
71///
72/// # grafos_std::host::reset_mock();
73/// # grafos_std::host::mock_set_fbmu_arena_size(131072);
74/// let mut kv = KvBuilder::new()
75///     .hot_buckets(128)
76///     .default_ttl_secs(60)
77///     .build()?;
78/// # Ok::<(), grafos_std::FabricError>(())
79/// ```
80pub struct KvBuilder {
81    hot_buckets: usize,
82    default_ttl_secs: u32,
83    key_stride: usize,
84    value_stride: usize,
85    #[cfg(feature = "persistence")]
86    cold_num_blocks: Option<u64>,
87}
88
89impl KvBuilder {
90    pub fn new() -> Self {
91        KvBuilder {
92            hot_buckets: 64,
93            default_ttl_secs: 300,
94            key_stride: KEY_STRIDE,
95            value_stride: VALUE_STRIDE,
96            #[cfg(feature = "persistence")]
97            cold_num_blocks: None,
98        }
99    }
100
101    /// Set the number of buckets for the hot-tier hash map.
102    pub fn hot_buckets(mut self, n: usize) -> Self {
103        self.hot_buckets = n;
104        self
105    }
106
107    /// Set the default TTL in seconds for keys without an explicit TTL.
108    pub fn default_ttl_secs(mut self, secs: u32) -> Self {
109        self.default_ttl_secs = secs;
110        self
111    }
112
113    /// Set the key stride (max serialized key size in bytes).
114    pub fn key_stride(mut self, stride: usize) -> Self {
115        self.key_stride = stride;
116        self
117    }
118
119    /// Set the value stride (max serialized KvEntry size in bytes).
120    pub fn value_stride(mut self, stride: usize) -> Self {
121        self.value_stride = stride;
122        self
123    }
124
125    /// Enable the cold tier with the given number of blocks.
126    #[cfg(feature = "persistence")]
127    pub fn cold_num_blocks(mut self, n: u64) -> Self {
128        self.cold_num_blocks = Some(n);
129        self
130    }
131
132    /// Build the [`FabricKvStore`].
133    pub fn build(self) -> Result<FabricKvStore> {
134        let hot =
135            FabricHashMap::with_capacity(self.hot_buckets, self.key_stride, self.value_stride)?;
136
137        #[cfg(feature = "persistence")]
138        let cold = if let Some(num_blocks) = self.cold_num_blocks {
139            Some(ColdTier::new(num_blocks)?)
140        } else {
141            None
142        };
143
144        Ok(FabricKvStore {
145            hot,
146            default_ttl_secs: self.default_ttl_secs,
147            #[cfg(feature = "persistence")]
148            cold,
149        })
150    }
151
152    /// Recover a [`FabricKvStore`] from existing hot and cold leases.
153    ///
154    /// The hot tier is reconstructed by reading the `FabricHashMap` header
155    /// from the memory lease. The cold tier is reconstructed by scanning
156    /// the append-only log to find the write cursor.
157    ///
158    /// This enables full crash recovery when both leases survived.
159    #[cfg(feature = "persistence")]
160    pub fn from_leases(
161        hot_lease: grafos_std::mem::MemLease,
162        cold_lease: BlockLease,
163    ) -> Result<FabricKvStore> {
164        let hot = FabricHashMap::from_lease(hot_lease)?;
165        let cold = Some(ColdTier::from_lease(cold_lease)?);
166        Ok(FabricKvStore {
167            hot,
168            default_ttl_secs: 300,
169            cold,
170        })
171    }
172
173    /// Recover a [`FabricKvStore`] from a surviving cold-tier block lease.
174    ///
175    /// A fresh hot tier is allocated (via `FabricHashMap::with_capacity`).
176    /// Cold-tier entries are promoted to the hot tier on first access.
177    ///
178    /// This is the common crash-recovery path: the tasklet's hot-tier
179    /// memory lease expired with the tasklet, but the cold-tier block
180    /// lease survived.
181    #[cfg(feature = "persistence")]
182    pub fn from_cold(cold_lease: BlockLease, hot_buckets: usize) -> Result<FabricKvStore> {
183        let hot = FabricHashMap::with_capacity(hot_buckets, KEY_STRIDE, VALUE_STRIDE)?;
184        let cold = Some(ColdTier::from_lease(cold_lease)?);
185        Ok(FabricKvStore {
186            hot,
187            default_ttl_secs: 300,
188            cold,
189        })
190    }
191}
192
193impl Default for KvBuilder {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199/// Cold tier: append-only log of key-value pairs in block storage.
200///
201/// Layout per record:
202/// ```text
203/// key_len: u32 (4) | key: [u8; key_len] | value_len: u32 (4) | value: [u8; value_len] | flags: u8 (1)
204/// ```
205///
206/// Flags: 0x00 = live, 0x01 = tombstoned.
207#[cfg(feature = "persistence")]
208struct ColdTier {
209    lease: BlockLease,
210    /// Next byte offset within the block device for appending.
211    write_cursor: u64,
212}
213
214#[cfg(feature = "persistence")]
215const COLD_FLAG_LIVE: u8 = 0x00;
216#[cfg(feature = "persistence")]
217const COLD_FLAG_TOMBSTONE: u8 = 0x01;
218
219#[cfg(feature = "persistence")]
220impl ColdTier {
221    fn new(num_blocks: u64) -> Result<Self> {
222        let lease = BlockBuilder::new().min_blocks(num_blocks).acquire()?;
223        Ok(ColdTier {
224            lease,
225            write_cursor: 0,
226        })
227    }
228
229    /// Recover a cold tier from an existing block lease.
230    ///
231    /// Scans the append-only log to find the write cursor position. This
232    /// enables crash recovery: a replacement tasklet can attach to a
233    /// surviving block lease and reconstruct the cold tier index.
234    fn from_lease(lease: BlockLease) -> Result<Self> {
235        let capacity = lease.block().num_blocks() * BLOCK_SIZE as u64;
236        let mut cursor: u64 = 0;
237
238        // Scan forward to find the end of written records.
239        // Each record: key_len(4) + key + val_len(4) + val + flags(1).
240        // A key_len of 0 with no valid record structure indicates end-of-log.
241        loop {
242            if cursor + 4 > capacity {
243                break;
244            }
245            // Read key_len (may span a block boundary).
246            let mut key_len_bytes = [0u8; 4];
247            let mut bytes_read = 0;
248            let mut off = cursor;
249            while bytes_read < 4 {
250                let bi = off / BLOCK_SIZE as u64;
251                let bo = (off % BLOCK_SIZE as u64) as usize;
252                let b = lease.block().read_block(bi)?;
253                let avail = BLOCK_SIZE - bo;
254                let take = (4 - bytes_read).min(avail);
255                key_len_bytes[bytes_read..bytes_read + take].copy_from_slice(&b[bo..bo + take]);
256                bytes_read += take;
257                off += take as u64;
258            }
259            let key_len = u32::from_le_bytes(key_len_bytes) as u64;
260
261            // A key_len of 0 means we've hit unwritten space (end of log).
262            if key_len == 0 {
263                break;
264            }
265            // Sanity: key_len shouldn't be absurdly large
266            if key_len > capacity {
267                break;
268            }
269            cursor += 4 + key_len;
270
271            // Read val_len
272            if cursor + 4 > capacity {
273                break;
274            }
275            let mut val_len_bytes = [0u8; 4];
276            bytes_read = 0;
277            off = cursor;
278            while bytes_read < 4 {
279                let bi = off / BLOCK_SIZE as u64;
280                let bo = (off % BLOCK_SIZE as u64) as usize;
281                let b = lease.block().read_block(bi)?;
282                let avail = BLOCK_SIZE - bo;
283                let take = (4 - bytes_read).min(avail);
284                val_len_bytes[bytes_read..bytes_read + take].copy_from_slice(&b[bo..bo + take]);
285                bytes_read += take;
286                off += take as u64;
287            }
288            let val_len = u32::from_le_bytes(val_len_bytes) as u64;
289            if val_len > capacity {
290                break;
291            }
292            cursor += 4 + val_len;
293
294            // Skip flags byte
295            if cursor + 1 > capacity {
296                break;
297            }
298            cursor += 1;
299        }
300
301        Ok(ColdTier {
302            lease,
303            write_cursor: cursor,
304        })
305    }
306
307    fn capacity_bytes(&self) -> u64 {
308        self.lease.block().num_blocks() * BLOCK_SIZE as u64
309    }
310
311    fn append_live(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
312        self.append_with_flag(key, value, COLD_FLAG_LIVE)
313    }
314
315    fn append_tombstone(&mut self, key: &[u8]) -> Result<()> {
316        self.append_with_flag(key, &[], COLD_FLAG_TOMBSTONE)
317    }
318
319    fn append_with_flag(&mut self, key: &[u8], value: &[u8], flag: u8) -> Result<()> {
320        let record_len = 4 + key.len() + 4 + value.len() + 1;
321        if self.write_cursor + record_len as u64 > self.capacity_bytes() {
322            return Err(FabricError::CapacityExceeded);
323        }
324
325        let mut record = Vec::with_capacity(record_len);
326        record.extend_from_slice(&(key.len() as u32).to_le_bytes());
327        record.extend_from_slice(key);
328        record.extend_from_slice(&(value.len() as u32).to_le_bytes());
329        record.extend_from_slice(value);
330        record.push(flag);
331
332        // Write record across blocks
333        let mut offset = self.write_cursor;
334        let mut remaining = &record[..];
335        while !remaining.is_empty() {
336            let block_idx = offset / BLOCK_SIZE as u64;
337            let block_offset = (offset % BLOCK_SIZE as u64) as usize;
338            let space = BLOCK_SIZE - block_offset;
339            let chunk_len = remaining.len().min(space);
340
341            // Read-modify-write the block
342            let mut block = self.lease.block().read_block(block_idx)?;
343            block[block_offset..block_offset + chunk_len].copy_from_slice(&remaining[..chunk_len]);
344            self.lease.block().write_block(block_idx, &block)?;
345
346            remaining = &remaining[chunk_len..];
347            offset += chunk_len as u64;
348        }
349
350        self.write_cursor += record_len as u64;
351        Ok(())
352    }
353
354    fn scan(&self, target_key: &[u8]) -> Result<Option<Vec<u8>>> {
355        let mut offset: u64 = 0;
356        let mut result: Option<Vec<u8>> = None;
357
358        while offset < self.write_cursor {
359            // Read key_len
360            let key_len_bytes = self.read_bytes(offset, 4)?;
361            let key_len = u32::from_le_bytes([
362                key_len_bytes[0],
363                key_len_bytes[1],
364                key_len_bytes[2],
365                key_len_bytes[3],
366            ]) as usize;
367            offset += 4;
368
369            let key = self.read_bytes(offset, key_len)?;
370            offset += key_len as u64;
371
372            let val_len_bytes = self.read_bytes(offset, 4)?;
373            let val_len = u32::from_le_bytes([
374                val_len_bytes[0],
375                val_len_bytes[1],
376                val_len_bytes[2],
377                val_len_bytes[3],
378            ]) as usize;
379            offset += 4;
380
381            let value = self.read_bytes(offset, val_len)?;
382            offset += val_len as u64;
383
384            let flags_bytes = self.read_bytes(offset, 1)?;
385            let flags = flags_bytes[0];
386            offset += 1;
387
388            if key == target_key {
389                if flags == COLD_FLAG_LIVE {
390                    result = Some(value);
391                } else {
392                    result = None;
393                }
394            }
395        }
396
397        Ok(result)
398    }
399
400    fn read_bytes(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
401        let mut buf = vec![0u8; len];
402        let mut buf_offset = 0;
403        let mut disk_offset = offset;
404
405        while buf_offset < len {
406            let block_idx = disk_offset / BLOCK_SIZE as u64;
407            let block_off = (disk_offset % BLOCK_SIZE as u64) as usize;
408            let space = BLOCK_SIZE - block_off;
409            let chunk_len = (len - buf_offset).min(space);
410
411            let block = self.lease.block().read_block(block_idx)?;
412            buf[buf_offset..buf_offset + chunk_len]
413                .copy_from_slice(&block[block_off..block_off + chunk_len]);
414
415            buf_offset += chunk_len;
416            disk_offset += chunk_len as u64;
417        }
418
419        Ok(buf)
420    }
421
422    /// Compact the cold tier by removing tombstoned entries.
423    /// Rewrites all live entries from the beginning.
424    fn compact(&mut self) -> Result<()> {
425        // Collect all live entries
426        let mut live_entries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
427        let mut offset: u64 = 0;
428
429        // Track the latest state per key (last-writer-wins)
430        while offset < self.write_cursor {
431            let key_len_bytes = self.read_bytes(offset, 4)?;
432            let key_len = u32::from_le_bytes([
433                key_len_bytes[0],
434                key_len_bytes[1],
435                key_len_bytes[2],
436                key_len_bytes[3],
437            ]) as usize;
438            offset += 4;
439
440            let key = self.read_bytes(offset, key_len)?;
441            offset += key_len as u64;
442
443            let val_len_bytes = self.read_bytes(offset, 4)?;
444            let val_len = u32::from_le_bytes([
445                val_len_bytes[0],
446                val_len_bytes[1],
447                val_len_bytes[2],
448                val_len_bytes[3],
449            ]) as usize;
450            offset += 4;
451
452            let value = self.read_bytes(offset, val_len)?;
453            offset += val_len as u64;
454
455            let flags_bytes = self.read_bytes(offset, 1)?;
456            let flags = flags_bytes[0];
457            offset += 1;
458
459            if flags == COLD_FLAG_LIVE {
460                // Remove any previous entry for this key
461                live_entries.retain(|(k, _)| k != &key);
462                live_entries.push((key, value));
463            } else {
464                live_entries.retain(|(k, _)| k != &key);
465            }
466        }
467
468        // Zero the write cursor and rewrite
469        self.write_cursor = 0;
470        for (key, value) in &live_entries {
471            self.append_live(key, value)?;
472        }
473
474        Ok(())
475    }
476}
477
478/// A key-value store backed by leased fabric resources.
479///
480/// The hot tier stores entries in a [`FabricHashMap`] over leased DRAM.
481/// Each entry tracks creation time, TTL, and access count. The
482/// [`tick()`](FabricKvStore::tick) method evicts expired entries.
483///
484/// With the `persistence` feature, a cold tier backs overflow to block
485/// storage. On a hot-tier miss, the cold tier is scanned and matching
486/// entries are promoted back.
487pub struct FabricKvStore {
488    hot: FabricHashMap<Vec<u8>, KvEntry>,
489    default_ttl_secs: u32,
490    #[cfg(feature = "persistence")]
491    cold: Option<ColdTier>,
492}
493
494impl FabricKvStore {
495    /// Store a key-value pair with the default TTL.
496    pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
497        self.put_with_ttl(key, value, self.default_ttl_secs)
498    }
499
500    /// Store a key-value pair with an explicit TTL in seconds.
501    pub fn put_with_ttl(&mut self, key: &[u8], value: &[u8], ttl_secs: u32) -> Result<()> {
502        let now = host::unix_time_secs();
503        let entry = KvEntry {
504            value: value.to_vec(),
505            created_at: now,
506            ttl_secs,
507            access_count: 0,
508        };
509        let key_vec = key.to_vec();
510
511        match self.hot.insert(&key_vec, &entry) {
512            Ok(_) => Ok(()),
513            #[cfg(feature = "persistence")]
514            Err(FabricError::CapacityExceeded) => {
515                // Hot tier full — spill to cold tier
516                if let Some(ref mut cold) = self.cold {
517                    cold.append_live(key, value)?;
518                    Ok(())
519                } else {
520                    Err(FabricError::CapacityExceeded)
521                }
522            }
523            Err(e) => Err(e),
524        }
525    }
526
527    /// Retrieve a value by key.
528    ///
529    /// Checks the hot tier first. On a miss (with `persistence` enabled),
530    /// scans the cold tier and promotes the entry back to the hot tier.
531    ///
532    /// Accessing a key refreshes its TTL (resets `created_at` to now).
533    pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
534        let key_vec = key.to_vec();
535        let now = host::unix_time_secs();
536
537        if let Some(mut entry) = self.hot.get(&key_vec)? {
538            // Check if expired
539            if now >= entry.created_at.saturating_add(entry.ttl_secs as u64) {
540                // Expired — remove it
541                self.hot.remove(&key_vec)?;
542                return Ok(None);
543            }
544            // Refresh TTL on access
545            entry.access_count += 1;
546            entry.created_at = now;
547            self.hot.insert(&key_vec, &entry)?;
548            return Ok(Some(entry.value));
549        }
550
551        #[cfg(feature = "persistence")]
552        if let Some(ref cold) = self.cold {
553            if let Some(value) = cold.scan(key)? {
554                // Promote to hot tier
555                let entry = KvEntry {
556                    value: value.clone(),
557                    created_at: now,
558                    ttl_secs: self.default_ttl_secs,
559                    access_count: 1,
560                };
561                // Best-effort promote; if hot is full, still return the value
562                let _ = self.hot.insert(&key_vec, &entry);
563                return Ok(Some(value));
564            }
565        }
566
567        Ok(None)
568    }
569
570    /// Delete a key. Returns `true` if the key existed.
571    pub fn delete(&mut self, key: &[u8]) -> Result<bool> {
572        let key_vec = key.to_vec();
573        let removed = self.hot.remove(&key_vec)?.is_some();
574
575        #[cfg(feature = "persistence")]
576        if let Some(ref mut cold) = self.cold {
577            // Append a tombstone to cold tier
578            cold.append_tombstone(key)?;
579            // If something was found in cold but not hot, we still consider it deleted
580            // The tombstone prevents future cold-tier scans from returning it
581        }
582
583        Ok(removed)
584    }
585
586    /// Check if a key exists (without reading the value).
587    pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
588        // Use get() which also handles expiry checking
589        Ok(self.get(key)?.is_some())
590    }
591
592    /// Return all non-expired keys in the hot tier.
593    pub fn keys(&self) -> Result<Vec<Vec<u8>>> {
594        let now = host::unix_time_secs();
595        let mut result = Vec::new();
596        for item in self.hot.iter() {
597            let (key, entry) = item?;
598            if now < entry.created_at.saturating_add(entry.ttl_secs as u64) {
599                result.push(key);
600            }
601        }
602        Ok(result)
603    }
604
605    /// Return the remaining TTL for a key in seconds, or `None` if the key
606    /// does not exist or is expired.
607    pub fn ttl(&self, key: &[u8]) -> Result<Option<u32>> {
608        let key_vec = key.to_vec();
609        let now = host::unix_time_secs();
610
611        if let Some(entry) = self.hot.get(&key_vec)? {
612            let expires_at = entry.created_at.saturating_add(entry.ttl_secs as u64);
613            if now < expires_at {
614                return Ok(Some((expires_at - now) as u32));
615            }
616        }
617        Ok(None)
618    }
619
620    /// Scan the hot tier and evict expired entries.
621    pub fn tick(&mut self) -> Result<usize> {
622        let now = host::unix_time_secs();
623        let mut expired_keys = Vec::new();
624
625        for item in self.hot.iter() {
626            let (key, entry) = item?;
627            if now >= entry.created_at.saturating_add(entry.ttl_secs as u64) {
628                expired_keys.push(key);
629            }
630        }
631
632        let count = expired_keys.len();
633        for key in &expired_keys {
634            self.hot.remove(key)?;
635        }
636        Ok(count)
637    }
638
639    /// Store a serializable struct as a value.
640    pub fn put_struct<T: Serialize>(&mut self, key: &[u8], value: &T) -> Result<()> {
641        let bytes = postcard::to_allocvec(value).map_err(|_| FabricError::IoError(-1))?;
642        self.put(key, &bytes)
643    }
644
645    /// Store a serializable struct with an explicit TTL.
646    pub fn put_struct_with_ttl<T: Serialize>(
647        &mut self,
648        key: &[u8],
649        value: &T,
650        ttl_secs: u32,
651    ) -> Result<()> {
652        let bytes = postcard::to_allocvec(value).map_err(|_| FabricError::IoError(-1))?;
653        self.put_with_ttl(key, &bytes, ttl_secs)
654    }
655
656    /// Retrieve and deserialize a struct by key.
657    pub fn get_struct<T: serde::de::DeserializeOwned>(&mut self, key: &[u8]) -> Result<Option<T>> {
658        match self.get(key)? {
659            Some(bytes) => {
660                let val: T = postcard::from_bytes(&bytes).map_err(|_| FabricError::IoError(-1))?;
661                Ok(Some(val))
662            }
663            None => Ok(None),
664        }
665    }
666
667    /// Compact the cold tier, removing tombstoned and duplicate entries.
668    ///
669    /// Only available with the `persistence` feature.
670    #[cfg(feature = "persistence")]
671    pub fn compact_cold(&mut self) -> Result<()> {
672        if let Some(ref mut cold) = self.cold {
673            cold.compact()
674        } else {
675            Ok(())
676        }
677    }
678}
679
680#[cfg(test)]
681mod tests {
682    use super::*;
683    use grafos_std::host;
684
685    fn setup() {
686        host::reset_mock();
687        host::mock_set_fbmu_arena_size(65536);
688        host::mock_set_fbbu_num_blocks(256);
689    }
690
691    #[test]
692    fn put_get_delete_roundtrip() {
693        setup();
694        let mut kv = KvBuilder::new()
695            .hot_buckets(64)
696            .default_ttl_secs(300)
697            .build()
698            .expect("build");
699
700        kv.put(b"key1", b"value1").expect("put");
701        kv.put(b"key2", b"value2").expect("put");
702
703        assert_eq!(kv.get(b"key1").expect("get"), Some(b"value1".to_vec()));
704        assert_eq!(kv.get(b"key2").expect("get"), Some(b"value2".to_vec()));
705        assert_eq!(kv.get(b"key3").expect("get"), None);
706
707        assert!(kv.delete(b"key1").expect("delete"));
708        assert_eq!(kv.get(b"key1").expect("get"), None);
709        assert!(!kv.delete(b"nonexistent").expect("delete"));
710    }
711
712    #[test]
713    fn put_struct_get_struct() {
714        setup();
715        let mut kv = KvBuilder::new().build().expect("build");
716
717        #[derive(Serialize, Deserialize, Debug, PartialEq)]
718        struct MyData {
719            count: u64,
720            name: Vec<u8>,
721        }
722
723        let data = MyData {
724            count: 42,
725            name: b"test".to_vec(),
726        };
727        kv.put_struct(b"structured", &data).expect("put_struct");
728
729        let retrieved: MyData = kv
730            .get_struct(b"structured")
731            .expect("get_struct")
732            .expect("some");
733        assert_eq!(retrieved, data);
734    }
735
736    #[test]
737    fn ttl_expiry() {
738        setup();
739        let mut kv = KvBuilder::new()
740            .default_ttl_secs(10)
741            .build()
742            .expect("build");
743
744        kv.put(b"ephemeral", b"data").expect("put");
745        assert_eq!(kv.get(b"ephemeral").expect("get"), Some(b"data".to_vec()));
746        assert!(kv.ttl(b"ephemeral").expect("ttl").is_some());
747
748        // Advance time past TTL
749        host::mock_advance_time_secs(11);
750
751        assert_eq!(kv.get(b"ephemeral").expect("get"), None);
752        assert_eq!(kv.ttl(b"ephemeral").expect("ttl"), None);
753    }
754
755    #[test]
756    fn ttl_renewal_on_access() {
757        setup();
758        let mut kv = KvBuilder::new()
759            .default_ttl_secs(10)
760            .build()
761            .expect("build");
762
763        kv.put(b"renew-me", b"data").expect("put");
764
765        // Advance time by 8 seconds (within TTL)
766        host::mock_advance_time_secs(8);
767
768        // Access the key — this refreshes created_at to now
769        let val = kv.get(b"renew-me").expect("get");
770        assert_eq!(val, Some(b"data".to_vec()));
771
772        // Advance by another 8 seconds (16 total from start, but only 8 from last access)
773        host::mock_advance_time_secs(8);
774
775        // Should still be alive because access renewed it
776        let val = kv.get(b"renew-me").expect("get");
777        assert_eq!(val, Some(b"data".to_vec()));
778    }
779
780    #[test]
781    fn exists_and_keys() {
782        setup();
783        let mut kv = KvBuilder::new()
784            .default_ttl_secs(300)
785            .build()
786            .expect("build");
787
788        kv.put(b"a", b"1").expect("put");
789        kv.put(b"b", b"2").expect("put");
790        kv.put(b"c", b"3").expect("put");
791
792        assert!(kv.exists(b"a").expect("exists"));
793        assert!(kv.exists(b"b").expect("exists"));
794        assert!(!kv.exists(b"nonexistent").expect("exists"));
795
796        let mut keys = kv.keys().expect("keys");
797        keys.sort();
798        assert_eq!(keys, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
799    }
800
801    #[test]
802    fn tick_evicts_expired() {
803        setup();
804        let mut kv = KvBuilder::new().default_ttl_secs(5).build().expect("build");
805
806        kv.put(b"short", b"val").expect("put");
807        kv.put_with_ttl(b"long", b"val", 100).expect("put");
808
809        host::mock_advance_time_secs(6);
810
811        let evicted = kv.tick().expect("tick");
812        assert_eq!(evicted, 1);
813
814        assert_eq!(kv.get(b"short").expect("get"), None);
815        assert_eq!(kv.get(b"long").expect("get"), Some(b"val".to_vec()));
816    }
817
818    #[test]
819    fn keys_excludes_expired() {
820        setup();
821        let mut kv = KvBuilder::new().default_ttl_secs(5).build().expect("build");
822
823        kv.put(b"expire-soon", b"val").expect("put");
824        kv.put_with_ttl(b"lives-long", b"val", 100).expect("put");
825
826        host::mock_advance_time_secs(6);
827
828        let keys = kv.keys().expect("keys");
829        assert_eq!(keys, vec![b"lives-long".to_vec()]);
830    }
831
832    #[test]
833    fn put_with_explicit_ttl() {
834        setup();
835        let mut kv = KvBuilder::new()
836            .default_ttl_secs(300)
837            .build()
838            .expect("build");
839
840        kv.put_with_ttl(b"short", b"val", 5).expect("put");
841        kv.put_with_ttl(b"long", b"val", 600).expect("put");
842
843        // After 6 seconds, short key should be expired, long should not
844        host::mock_advance_time_secs(6);
845
846        assert_eq!(kv.get(b"short").expect("get"), None);
847        assert_eq!(kv.get(b"long").expect("get"), Some(b"val".to_vec()));
848    }
849
850    #[test]
851    fn ttl_remaining_decreases() {
852        setup();
853        let mut kv = KvBuilder::new()
854            .default_ttl_secs(100)
855            .build()
856            .expect("build");
857
858        kv.put(b"k", b"v").expect("put");
859
860        let ttl1 = kv.ttl(b"k").expect("ttl").expect("some");
861        assert_eq!(ttl1, 100);
862
863        host::mock_advance_time_secs(30);
864        let ttl2 = kv.ttl(b"k").expect("ttl").expect("some");
865        assert_eq!(ttl2, 70);
866
867        host::mock_advance_time_secs(71);
868        // Past expiry
869        assert_eq!(kv.ttl(b"k").expect("ttl"), None);
870    }
871
872    #[test]
873    fn tick_mixed_ttls_evicts_selectively() {
874        setup();
875        let mut kv = KvBuilder::new()
876            .default_ttl_secs(100)
877            .build()
878            .expect("build");
879
880        kv.put_with_ttl(b"a", b"1", 5).expect("put");
881        kv.put_with_ttl(b"b", b"2", 20).expect("put");
882        kv.put_with_ttl(b"c", b"3", 100).expect("put");
883
884        // After 6s: "a" expired
885        host::mock_advance_time_secs(6);
886        let evicted = kv.tick().expect("tick");
887        assert_eq!(evicted, 1);
888        // "a" should be gone, "b" and "c" should remain
889        assert_eq!(kv.keys().expect("keys").len(), 2);
890
891        // After 21s total: "b" expired too
892        // Note: don't use get() between ticks — it refreshes TTL on access.
893        host::mock_advance_time_secs(15);
894        let evicted = kv.tick().expect("tick");
895        assert_eq!(evicted, 1);
896        // Only "c" should remain
897        let keys = kv.keys().expect("keys");
898        assert_eq!(keys.len(), 1);
899        assert_eq!(keys[0], b"c".to_vec());
900    }
901
902    #[test]
903    fn tick_on_empty_store() {
904        setup();
905        let mut kv = KvBuilder::new().build().expect("build");
906        let evicted = kv.tick().expect("tick");
907        assert_eq!(evicted, 0);
908    }
909
910    #[test]
911    fn overwrite_resets_ttl() {
912        setup();
913        let mut kv = KvBuilder::new()
914            .default_ttl_secs(10)
915            .build()
916            .expect("build");
917
918        kv.put(b"k", b"v1").expect("put");
919        host::mock_advance_time_secs(8);
920
921        // Overwrite with new value — should reset TTL
922        kv.put(b"k", b"v2").expect("put");
923
924        // After 5 more seconds (13 total), original would have expired
925        // but overwrite reset TTL, so it should still be alive
926        host::mock_advance_time_secs(5);
927        assert_eq!(kv.get(b"k").expect("get"), Some(b"v2".to_vec()));
928    }
929
930    #[test]
931    fn delete_nonexistent_returns_false() {
932        setup();
933        let mut kv = KvBuilder::new().build().expect("build");
934        assert!(!kv.delete(b"nope").expect("delete"));
935    }
936
937    #[test]
938    fn exists_returns_false_for_expired() {
939        setup();
940        let mut kv = KvBuilder::new().default_ttl_secs(5).build().expect("build");
941
942        kv.put(b"k", b"v").expect("put");
943        assert!(kv.exists(b"k").expect("exists"));
944
945        host::mock_advance_time_secs(6);
946        assert!(!kv.exists(b"k").expect("exists after expiry"));
947    }
948
949    #[test]
950    fn builder_custom_strides() {
951        setup();
952        // Use custom strides
953        let mut kv = KvBuilder::new()
954            .hot_buckets(16)
955            .key_stride(64)
956            .value_stride(256)
957            .default_ttl_secs(60)
958            .build()
959            .expect("build");
960
961        kv.put(b"key", b"val").expect("put");
962        assert_eq!(kv.get(b"key").expect("get"), Some(b"val".to_vec()));
963    }
964
965    #[test]
966    fn get_on_expired_entry_removes_it_from_hot() {
967        setup();
968        let mut kv = KvBuilder::new().default_ttl_secs(5).build().expect("build");
969
970        kv.put(b"k", b"v").expect("put");
971        assert_eq!(kv.keys().expect("keys").len(), 1);
972
973        host::mock_advance_time_secs(6);
974
975        // get() on an expired entry should return None AND remove it
976        assert_eq!(kv.get(b"k").expect("get"), None);
977
978        // tick() should find nothing to evict since get() already cleaned up
979        let evicted = kv.tick().expect("tick");
980        assert_eq!(evicted, 0);
981    }
982
983    #[cfg(feature = "persistence")]
984    mod cold_tier_tests {
985        use super::*;
986
987        fn setup_with_cold() {
988            host::reset_mock();
989            host::mock_set_fbmu_arena_size(65536);
990            host::mock_set_fbbu_num_blocks(256);
991        }
992
993        /// Set up mock with a small hot tier that actually triggers cold spill.
994        ///
995        /// bucket_size = 1 + 8 + 4 + 128 + 4 + 512 = 657
996        /// 4 buckets: HEADER(32) + 4 * 657 = 2660 bytes
997        /// 75% load factor → max 3 entries, 4th spills to cold.
998        fn setup_small_hot() {
999            host::reset_mock();
1000            host::mock_set_fbmu_arena_size(2660);
1001            host::mock_set_fbbu_num_blocks(256);
1002        }
1003
1004        #[test]
1005        fn cold_tier_spill_and_promote() {
1006            setup_small_hot();
1007
1008            let mut kv = KvBuilder::new()
1009                .hot_buckets(4)
1010                .default_ttl_secs(300)
1011                .cold_num_blocks(64)
1012                .build()
1013                .expect("build");
1014
1015            // Fill hot tier to 75% capacity (3 of 4 buckets)
1016            kv.put(b"a", b"1").expect("put a");
1017            kv.put(b"b", b"2").expect("put b");
1018            kv.put(b"c", b"3").expect("put c");
1019
1020            // 4th insert exceeds load factor → spills to cold tier
1021            kv.put(b"d", b"4").expect("put d (cold spill)");
1022            assert!(
1023                kv.cold.as_ref().unwrap().write_cursor > 0,
1024                "d should have spilled to cold tier"
1025            );
1026
1027            // Key "d" should be findable via cold tier scan
1028            let val = kv.get(b"d").expect("get d");
1029            assert_eq!(val, Some(b"4".to_vec()));
1030        }
1031
1032        #[test]
1033        fn cold_tier_compaction() {
1034            setup_small_hot();
1035            let mut kv = KvBuilder::new()
1036                .hot_buckets(4)
1037                .default_ttl_secs(300)
1038                .cold_num_blocks(64)
1039                .build()
1040                .expect("build");
1041
1042            // Fill hot tier (3 entries at 75% of 4 buckets)
1043            kv.put(b"a", b"1").expect("put");
1044            kv.put(b"b", b"2").expect("put");
1045            kv.put(b"c", b"3").expect("put");
1046
1047            // Spill to cold
1048            kv.put(b"d", b"4").expect("put d");
1049            kv.put(b"e", b"5").expect("put e");
1050
1051            // Delete a cold-tier key (writes tombstone)
1052            kv.delete(b"d").expect("delete d");
1053
1054            // Compact should remove the tombstoned entry
1055            kv.compact_cold().expect("compact");
1056
1057            // "e" should still be findable
1058            let val = kv.get(b"e").expect("get e");
1059            assert_eq!(val, Some(b"5".to_vec()));
1060
1061            // "d" should be gone
1062            let val = kv.get(b"d").expect("get d");
1063            assert_eq!(val, None);
1064        }
1065
1066        #[test]
1067        fn cold_tier_multiple_writes_last_writer_wins() {
1068            setup_small_hot();
1069            let mut kv = KvBuilder::new()
1070                .hot_buckets(4)
1071                .default_ttl_secs(300)
1072                .cold_num_blocks(64)
1073                .build()
1074                .expect("build");
1075
1076            // Fill hot tier (3 entries at 75% of 4 buckets)
1077            kv.put(b"a", b"1").expect("put a");
1078            kv.put(b"b", b"2").expect("put b");
1079            kv.put(b"c", b"3").expect("put c");
1080
1081            // Spill to cold with same key multiple times
1082            kv.put(b"d", b"first").expect("put d first");
1083            kv.put(b"d", b"second").expect("put d second");
1084
1085            // Last value should win (cold tier scan returns last live entry)
1086            let val = kv.get(b"d").expect("get d");
1087            assert_eq!(val, Some(b"second".to_vec()));
1088        }
1089
1090        #[test]
1091        fn cold_tier_promote_to_hot_on_read() {
1092            setup_small_hot();
1093            let mut kv = KvBuilder::new()
1094                .hot_buckets(4)
1095                .default_ttl_secs(300)
1096                .cold_num_blocks(64)
1097                .build()
1098                .expect("build");
1099
1100            // Fill hot tier (3 entries at 75% of 4 buckets)
1101            kv.put(b"a", b"1").expect("put a");
1102            kv.put(b"b", b"2").expect("put b");
1103            kv.put(b"c", b"3").expect("put c");
1104
1105            // Spill "d" to cold
1106            kv.put(b"d", b"4").expect("put d");
1107
1108            // Delete a hot-tier key to free space
1109            kv.delete(b"a").expect("delete a");
1110
1111            // Now read "d" — should promote from cold to hot
1112            let val = kv.get(b"d").expect("get d");
1113            assert_eq!(val, Some(b"4".to_vec()));
1114
1115            // Reading "d" again should now hit the hot tier (promoted)
1116            // We can verify by checking it still works
1117            let val2 = kv.get(b"d").expect("get d again");
1118            assert_eq!(val2, Some(b"4".to_vec()));
1119        }
1120
1121        #[test]
1122        fn cold_tier_tombstone_prevents_read() {
1123            setup_small_hot();
1124            let mut kv = KvBuilder::new()
1125                .hot_buckets(4)
1126                .default_ttl_secs(300)
1127                .cold_num_blocks(64)
1128                .build()
1129                .expect("build");
1130
1131            // Fill hot tier (3 entries at 75% of 4 buckets)
1132            kv.put(b"a", b"1").expect("put a");
1133            kv.put(b"b", b"2").expect("put b");
1134            kv.put(b"c", b"3").expect("put c");
1135
1136            // Spill to cold
1137            kv.put(b"d", b"4").expect("put d");
1138
1139            // Delete "d" — writes tombstone to cold
1140            kv.delete(b"d").expect("delete d");
1141
1142            // "d" should not be found (tombstone blocks it)
1143            let val = kv.get(b"d").expect("get d");
1144            assert_eq!(val, None);
1145        }
1146
1147        #[test]
1148        fn compact_cold_removes_duplicates() {
1149            setup_small_hot();
1150            let mut kv = KvBuilder::new()
1151                .hot_buckets(4)
1152                .default_ttl_secs(300)
1153                .cold_num_blocks(64)
1154                .build()
1155                .expect("build");
1156
1157            // Fill hot tier (3 entries at 75% of 4 buckets)
1158            kv.put(b"a", b"1").expect("put a");
1159            kv.put(b"b", b"2").expect("put b");
1160            kv.put(b"c", b"3").expect("put c");
1161
1162            // Write several versions to cold
1163            kv.put(b"d", b"v1").expect("put d v1");
1164            kv.put(b"d", b"v2").expect("put d v2");
1165            kv.put(b"d", b"v3").expect("put d v3");
1166            kv.put(b"e", b"x1").expect("put e x1");
1167            kv.put(b"e", b"x2").expect("put e x2");
1168
1169            // Compact — should deduplicate, keeping latest
1170            kv.compact_cold().expect("compact");
1171
1172            // Values should still be correct
1173            let val_d = kv.get(b"d").expect("get d");
1174            assert_eq!(val_d, Some(b"v3".to_vec()));
1175            let val_e = kv.get(b"e").expect("get e");
1176            assert_eq!(val_e, Some(b"x2".to_vec()));
1177        }
1178
1179        #[test]
1180        fn hot_full_without_cold_returns_capacity_exceeded() {
1181            host::reset_mock();
1182            // Small arena: bucket_size = 1+8+4+128+4+512 = 657.
1183            // For exactly 4 buckets: 32 + 4*657 = 2660 bytes.
1184            // 75% load factor = 3 entries max.
1185            host::mock_set_fbmu_arena_size(2660);
1186            host::mock_set_fbbu_num_blocks(256);
1187
1188            // Build without cold tier
1189            let mut kv = KvBuilder::new()
1190                .hot_buckets(4)
1191                .default_ttl_secs(300)
1192                .build()
1193                .expect("build");
1194
1195            // Fill hot tier to 75% load (3 of 4 buckets)
1196            kv.put(b"a", b"1").expect("put a");
1197            kv.put(b"b", b"2").expect("put b");
1198            kv.put(b"c", b"3").expect("put c");
1199
1200            // 4th insert should fail — no cold tier to spill to
1201            let err = kv.put(b"d", b"4").unwrap_err();
1202            assert_eq!(err, FabricError::CapacityExceeded);
1203        }
1204
1205        #[test]
1206        fn compact_cold_on_store_without_cold_is_noop() {
1207            setup_with_cold();
1208            let mut kv = KvBuilder::new()
1209                .hot_buckets(64)
1210                .default_ttl_secs(300)
1211                .build()
1212                .expect("build");
1213
1214            // Should not error
1215            kv.compact_cold().expect("compact on no cold");
1216        }
1217
1218        #[test]
1219        fn from_cold_recovers_spilled_keys() {
1220            host::reset_mock();
1221            host::mock_set_fbbu_num_blocks(256);
1222
1223            // bucket_size = 1 + 8 + 4 + 128 + 4 + 512 = 657
1224            // For exactly 4 buckets: HEADER(32) + 4 * 657 = 2660 bytes
1225            // 75% load factor: max 3 entries before CapacityExceeded
1226            host::mock_set_fbmu_arena_size(2660);
1227
1228            // Build a store, spill data to cold tier, then simulate crash.
1229            let mut kv = KvBuilder::new()
1230                .hot_buckets(4)
1231                .default_ttl_secs(300)
1232                .cold_num_blocks(64)
1233                .build()
1234                .expect("build");
1235
1236            // Fill hot tier to 75% capacity (3 of 4 buckets)
1237            kv.put(b"a", b"1").expect("put a");
1238            kv.put(b"b", b"2").expect("put b");
1239            kv.put(b"c", b"3").expect("put c");
1240
1241            // 4th insert exceeds 75% load factor → spills to cold tier
1242            kv.put(b"cold-key", b"cold-value").expect("put cold");
1243            assert!(
1244                kv.cold.as_ref().unwrap().write_cursor > 0,
1245                "cold-key should have spilled to cold tier"
1246            );
1247
1248            // Grab lease ID, then simulate a crash (forget = no Drop = no free).
1249            let cold_lease_id = kv.cold.as_ref().unwrap().lease.lease_id();
1250            core::mem::forget(kv);
1251
1252            // Bump arena size for recovery hot tier allocation (64 buckets).
1253            host::mock_set_fbmu_arena_size(65536);
1254
1255            // Recover from cold tier only (hot tier lease "expired").
1256            let cold_lease =
1257                grafos_std::block::BlockBuilder::attach(cold_lease_id).expect("attach cold");
1258            let mut recovered = KvBuilder::from_cold(cold_lease, 64).expect("from_cold");
1259
1260            // Cold-tier key should be accessible via promote-on-read.
1261            let val = recovered.get(b"cold-key").expect("get cold-key");
1262            assert_eq!(val, Some(b"cold-value".to_vec()));
1263        }
1264
1265        #[test]
1266        fn from_leases_recovers_hot_and_cold() {
1267            setup_with_cold();
1268
1269            let mut kv = KvBuilder::new()
1270                .hot_buckets(64)
1271                .default_ttl_secs(300)
1272                .cold_num_blocks(64)
1273                .build()
1274                .expect("build");
1275
1276            kv.put(b"hot-key", b"hot-value").expect("put hot");
1277
1278            let hot_lease_id = kv.hot.lease_id();
1279            let cold_lease_id = kv.cold.as_ref().unwrap().lease.lease_id();
1280            core::mem::forget(kv);
1281
1282            // Recover from both leases.
1283            let hot_lease = grafos_std::mem::MemBuilder::attach(hot_lease_id).expect("attach hot");
1284            let cold_lease =
1285                grafos_std::block::BlockBuilder::attach(cold_lease_id).expect("attach cold");
1286            let mut recovered = KvBuilder::from_leases(hot_lease, cold_lease).expect("from_leases");
1287
1288            // Hot-tier key should be directly accessible.
1289            let val = recovered.get(b"hot-key").expect("get hot-key");
1290            assert_eq!(val, Some(b"hot-value".to_vec()));
1291        }
1292    }
1293}