1#![cfg_attr(not(feature = "std"), no_std)]
37
38extern crate alloc;
39
40use alloc::vec::Vec;
41
42use grafos_collections::map::FabricHashMap;
43use grafos_std::error::{FabricError, Result};
44use grafos_std::host;
45
46use serde::{Deserialize, Serialize};
47
48#[cfg(feature = "persistence")]
49use grafos_std::block::{BlockBuilder, BlockLease, BLOCK_SIZE};
50
51const KEY_STRIDE: usize = 128;
53const VALUE_STRIDE: usize = 512;
55
56#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
58struct KvEntry {
59 value: Vec<u8>,
60 created_at: u64,
61 ttl_secs: u32,
62 access_count: u64,
63}
64
65pub struct KvBuilder {
81 hot_buckets: usize,
82 default_ttl_secs: u32,
83 key_stride: usize,
84 value_stride: usize,
85 #[cfg(feature = "persistence")]
86 cold_num_blocks: Option<u64>,
87}
88
89impl KvBuilder {
90 pub fn new() -> Self {
91 KvBuilder {
92 hot_buckets: 64,
93 default_ttl_secs: 300,
94 key_stride: KEY_STRIDE,
95 value_stride: VALUE_STRIDE,
96 #[cfg(feature = "persistence")]
97 cold_num_blocks: None,
98 }
99 }
100
101 pub fn hot_buckets(mut self, n: usize) -> Self {
103 self.hot_buckets = n;
104 self
105 }
106
107 pub fn default_ttl_secs(mut self, secs: u32) -> Self {
109 self.default_ttl_secs = secs;
110 self
111 }
112
113 pub fn key_stride(mut self, stride: usize) -> Self {
115 self.key_stride = stride;
116 self
117 }
118
119 pub fn value_stride(mut self, stride: usize) -> Self {
121 self.value_stride = stride;
122 self
123 }
124
125 #[cfg(feature = "persistence")]
127 pub fn cold_num_blocks(mut self, n: u64) -> Self {
128 self.cold_num_blocks = Some(n);
129 self
130 }
131
132 pub fn build(self) -> Result<FabricKvStore> {
134 let hot =
135 FabricHashMap::with_capacity(self.hot_buckets, self.key_stride, self.value_stride)?;
136
137 #[cfg(feature = "persistence")]
138 let cold = if let Some(num_blocks) = self.cold_num_blocks {
139 Some(ColdTier::new(num_blocks)?)
140 } else {
141 None
142 };
143
144 Ok(FabricKvStore {
145 hot,
146 default_ttl_secs: self.default_ttl_secs,
147 #[cfg(feature = "persistence")]
148 cold,
149 })
150 }
151
152 #[cfg(feature = "persistence")]
160 pub fn from_leases(
161 hot_lease: grafos_std::mem::MemLease,
162 cold_lease: BlockLease,
163 ) -> Result<FabricKvStore> {
164 let hot = FabricHashMap::from_lease(hot_lease)?;
165 let cold = Some(ColdTier::from_lease(cold_lease)?);
166 Ok(FabricKvStore {
167 hot,
168 default_ttl_secs: 300,
169 cold,
170 })
171 }
172
173 #[cfg(feature = "persistence")]
182 pub fn from_cold(cold_lease: BlockLease, hot_buckets: usize) -> Result<FabricKvStore> {
183 let hot = FabricHashMap::with_capacity(hot_buckets, KEY_STRIDE, VALUE_STRIDE)?;
184 let cold = Some(ColdTier::from_lease(cold_lease)?);
185 Ok(FabricKvStore {
186 hot,
187 default_ttl_secs: 300,
188 cold,
189 })
190 }
191}
192
193impl Default for KvBuilder {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[cfg(feature = "persistence")]
208struct ColdTier {
209 lease: BlockLease,
210 write_cursor: u64,
212}
213
214#[cfg(feature = "persistence")]
215const COLD_FLAG_LIVE: u8 = 0x00;
216#[cfg(feature = "persistence")]
217const COLD_FLAG_TOMBSTONE: u8 = 0x01;
218
219#[cfg(feature = "persistence")]
220impl ColdTier {
221 fn new(num_blocks: u64) -> Result<Self> {
222 let lease = BlockBuilder::new().min_blocks(num_blocks).acquire()?;
223 Ok(ColdTier {
224 lease,
225 write_cursor: 0,
226 })
227 }
228
229 fn from_lease(lease: BlockLease) -> Result<Self> {
235 let capacity = lease.block().num_blocks() * BLOCK_SIZE as u64;
236 let mut cursor: u64 = 0;
237
238 loop {
242 if cursor + 4 > capacity {
243 break;
244 }
245 let mut key_len_bytes = [0u8; 4];
247 let mut bytes_read = 0;
248 let mut off = cursor;
249 while bytes_read < 4 {
250 let bi = off / BLOCK_SIZE as u64;
251 let bo = (off % BLOCK_SIZE as u64) as usize;
252 let b = lease.block().read_block(bi)?;
253 let avail = BLOCK_SIZE - bo;
254 let take = (4 - bytes_read).min(avail);
255 key_len_bytes[bytes_read..bytes_read + take].copy_from_slice(&b[bo..bo + take]);
256 bytes_read += take;
257 off += take as u64;
258 }
259 let key_len = u32::from_le_bytes(key_len_bytes) as u64;
260
261 if key_len == 0 {
263 break;
264 }
265 if key_len > capacity {
267 break;
268 }
269 cursor += 4 + key_len;
270
271 if cursor + 4 > capacity {
273 break;
274 }
275 let mut val_len_bytes = [0u8; 4];
276 bytes_read = 0;
277 off = cursor;
278 while bytes_read < 4 {
279 let bi = off / BLOCK_SIZE as u64;
280 let bo = (off % BLOCK_SIZE as u64) as usize;
281 let b = lease.block().read_block(bi)?;
282 let avail = BLOCK_SIZE - bo;
283 let take = (4 - bytes_read).min(avail);
284 val_len_bytes[bytes_read..bytes_read + take].copy_from_slice(&b[bo..bo + take]);
285 bytes_read += take;
286 off += take as u64;
287 }
288 let val_len = u32::from_le_bytes(val_len_bytes) as u64;
289 if val_len > capacity {
290 break;
291 }
292 cursor += 4 + val_len;
293
294 if cursor + 1 > capacity {
296 break;
297 }
298 cursor += 1;
299 }
300
301 Ok(ColdTier {
302 lease,
303 write_cursor: cursor,
304 })
305 }
306
307 fn capacity_bytes(&self) -> u64 {
308 self.lease.block().num_blocks() * BLOCK_SIZE as u64
309 }
310
311 fn append_live(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
312 self.append_with_flag(key, value, COLD_FLAG_LIVE)
313 }
314
315 fn append_tombstone(&mut self, key: &[u8]) -> Result<()> {
316 self.append_with_flag(key, &[], COLD_FLAG_TOMBSTONE)
317 }
318
319 fn append_with_flag(&mut self, key: &[u8], value: &[u8], flag: u8) -> Result<()> {
320 let record_len = 4 + key.len() + 4 + value.len() + 1;
321 if self.write_cursor + record_len as u64 > self.capacity_bytes() {
322 return Err(FabricError::CapacityExceeded);
323 }
324
325 let mut record = Vec::with_capacity(record_len);
326 record.extend_from_slice(&(key.len() as u32).to_le_bytes());
327 record.extend_from_slice(key);
328 record.extend_from_slice(&(value.len() as u32).to_le_bytes());
329 record.extend_from_slice(value);
330 record.push(flag);
331
332 let mut offset = self.write_cursor;
334 let mut remaining = &record[..];
335 while !remaining.is_empty() {
336 let block_idx = offset / BLOCK_SIZE as u64;
337 let block_offset = (offset % BLOCK_SIZE as u64) as usize;
338 let space = BLOCK_SIZE - block_offset;
339 let chunk_len = remaining.len().min(space);
340
341 let mut block = self.lease.block().read_block(block_idx)?;
343 block[block_offset..block_offset + chunk_len].copy_from_slice(&remaining[..chunk_len]);
344 self.lease.block().write_block(block_idx, &block)?;
345
346 remaining = &remaining[chunk_len..];
347 offset += chunk_len as u64;
348 }
349
350 self.write_cursor += record_len as u64;
351 Ok(())
352 }
353
354 fn scan(&self, target_key: &[u8]) -> Result<Option<Vec<u8>>> {
355 let mut offset: u64 = 0;
356 let mut result: Option<Vec<u8>> = None;
357
358 while offset < self.write_cursor {
359 let key_len_bytes = self.read_bytes(offset, 4)?;
361 let key_len = u32::from_le_bytes([
362 key_len_bytes[0],
363 key_len_bytes[1],
364 key_len_bytes[2],
365 key_len_bytes[3],
366 ]) as usize;
367 offset += 4;
368
369 let key = self.read_bytes(offset, key_len)?;
370 offset += key_len as u64;
371
372 let val_len_bytes = self.read_bytes(offset, 4)?;
373 let val_len = u32::from_le_bytes([
374 val_len_bytes[0],
375 val_len_bytes[1],
376 val_len_bytes[2],
377 val_len_bytes[3],
378 ]) as usize;
379 offset += 4;
380
381 let value = self.read_bytes(offset, val_len)?;
382 offset += val_len as u64;
383
384 let flags_bytes = self.read_bytes(offset, 1)?;
385 let flags = flags_bytes[0];
386 offset += 1;
387
388 if key == target_key {
389 if flags == COLD_FLAG_LIVE {
390 result = Some(value);
391 } else {
392 result = None;
393 }
394 }
395 }
396
397 Ok(result)
398 }
399
400 fn read_bytes(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
401 let mut buf = vec![0u8; len];
402 let mut buf_offset = 0;
403 let mut disk_offset = offset;
404
405 while buf_offset < len {
406 let block_idx = disk_offset / BLOCK_SIZE as u64;
407 let block_off = (disk_offset % BLOCK_SIZE as u64) as usize;
408 let space = BLOCK_SIZE - block_off;
409 let chunk_len = (len - buf_offset).min(space);
410
411 let block = self.lease.block().read_block(block_idx)?;
412 buf[buf_offset..buf_offset + chunk_len]
413 .copy_from_slice(&block[block_off..block_off + chunk_len]);
414
415 buf_offset += chunk_len;
416 disk_offset += chunk_len as u64;
417 }
418
419 Ok(buf)
420 }
421
422 fn compact(&mut self) -> Result<()> {
425 let mut live_entries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
427 let mut offset: u64 = 0;
428
429 while offset < self.write_cursor {
431 let key_len_bytes = self.read_bytes(offset, 4)?;
432 let key_len = u32::from_le_bytes([
433 key_len_bytes[0],
434 key_len_bytes[1],
435 key_len_bytes[2],
436 key_len_bytes[3],
437 ]) as usize;
438 offset += 4;
439
440 let key = self.read_bytes(offset, key_len)?;
441 offset += key_len as u64;
442
443 let val_len_bytes = self.read_bytes(offset, 4)?;
444 let val_len = u32::from_le_bytes([
445 val_len_bytes[0],
446 val_len_bytes[1],
447 val_len_bytes[2],
448 val_len_bytes[3],
449 ]) as usize;
450 offset += 4;
451
452 let value = self.read_bytes(offset, val_len)?;
453 offset += val_len as u64;
454
455 let flags_bytes = self.read_bytes(offset, 1)?;
456 let flags = flags_bytes[0];
457 offset += 1;
458
459 if flags == COLD_FLAG_LIVE {
460 live_entries.retain(|(k, _)| k != &key);
462 live_entries.push((key, value));
463 } else {
464 live_entries.retain(|(k, _)| k != &key);
465 }
466 }
467
468 self.write_cursor = 0;
470 for (key, value) in &live_entries {
471 self.append_live(key, value)?;
472 }
473
474 Ok(())
475 }
476}
477
478pub struct FabricKvStore {
488 hot: FabricHashMap<Vec<u8>, KvEntry>,
489 default_ttl_secs: u32,
490 #[cfg(feature = "persistence")]
491 cold: Option<ColdTier>,
492}
493
494impl FabricKvStore {
495 pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
497 self.put_with_ttl(key, value, self.default_ttl_secs)
498 }
499
500 pub fn put_with_ttl(&mut self, key: &[u8], value: &[u8], ttl_secs: u32) -> Result<()> {
502 let now = host::unix_time_secs();
503 let entry = KvEntry {
504 value: value.to_vec(),
505 created_at: now,
506 ttl_secs,
507 access_count: 0,
508 };
509 let key_vec = key.to_vec();
510
511 match self.hot.insert(&key_vec, &entry) {
512 Ok(_) => Ok(()),
513 #[cfg(feature = "persistence")]
514 Err(FabricError::CapacityExceeded) => {
515 if let Some(ref mut cold) = self.cold {
517 cold.append_live(key, value)?;
518 Ok(())
519 } else {
520 Err(FabricError::CapacityExceeded)
521 }
522 }
523 Err(e) => Err(e),
524 }
525 }
526
527 pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
534 let key_vec = key.to_vec();
535 let now = host::unix_time_secs();
536
537 if let Some(mut entry) = self.hot.get(&key_vec)? {
538 if now >= entry.created_at.saturating_add(entry.ttl_secs as u64) {
540 self.hot.remove(&key_vec)?;
542 return Ok(None);
543 }
544 entry.access_count += 1;
546 entry.created_at = now;
547 self.hot.insert(&key_vec, &entry)?;
548 return Ok(Some(entry.value));
549 }
550
551 #[cfg(feature = "persistence")]
552 if let Some(ref cold) = self.cold {
553 if let Some(value) = cold.scan(key)? {
554 let entry = KvEntry {
556 value: value.clone(),
557 created_at: now,
558 ttl_secs: self.default_ttl_secs,
559 access_count: 1,
560 };
561 let _ = self.hot.insert(&key_vec, &entry);
563 return Ok(Some(value));
564 }
565 }
566
567 Ok(None)
568 }
569
570 pub fn delete(&mut self, key: &[u8]) -> Result<bool> {
572 let key_vec = key.to_vec();
573 let removed = self.hot.remove(&key_vec)?.is_some();
574
575 #[cfg(feature = "persistence")]
576 if let Some(ref mut cold) = self.cold {
577 cold.append_tombstone(key)?;
579 }
582
583 Ok(removed)
584 }
585
586 pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
588 Ok(self.get(key)?.is_some())
590 }
591
592 pub fn keys(&self) -> Result<Vec<Vec<u8>>> {
594 let now = host::unix_time_secs();
595 let mut result = Vec::new();
596 for item in self.hot.iter() {
597 let (key, entry) = item?;
598 if now < entry.created_at.saturating_add(entry.ttl_secs as u64) {
599 result.push(key);
600 }
601 }
602 Ok(result)
603 }
604
605 pub fn ttl(&self, key: &[u8]) -> Result<Option<u32>> {
608 let key_vec = key.to_vec();
609 let now = host::unix_time_secs();
610
611 if let Some(entry) = self.hot.get(&key_vec)? {
612 let expires_at = entry.created_at.saturating_add(entry.ttl_secs as u64);
613 if now < expires_at {
614 return Ok(Some((expires_at - now) as u32));
615 }
616 }
617 Ok(None)
618 }
619
620 pub fn tick(&mut self) -> Result<usize> {
622 let now = host::unix_time_secs();
623 let mut expired_keys = Vec::new();
624
625 for item in self.hot.iter() {
626 let (key, entry) = item?;
627 if now >= entry.created_at.saturating_add(entry.ttl_secs as u64) {
628 expired_keys.push(key);
629 }
630 }
631
632 let count = expired_keys.len();
633 for key in &expired_keys {
634 self.hot.remove(key)?;
635 }
636 Ok(count)
637 }
638
639 pub fn put_struct<T: Serialize>(&mut self, key: &[u8], value: &T) -> Result<()> {
641 let bytes = postcard::to_allocvec(value).map_err(|_| FabricError::IoError(-1))?;
642 self.put(key, &bytes)
643 }
644
645 pub fn put_struct_with_ttl<T: Serialize>(
647 &mut self,
648 key: &[u8],
649 value: &T,
650 ttl_secs: u32,
651 ) -> Result<()> {
652 let bytes = postcard::to_allocvec(value).map_err(|_| FabricError::IoError(-1))?;
653 self.put_with_ttl(key, &bytes, ttl_secs)
654 }
655
656 pub fn get_struct<T: serde::de::DeserializeOwned>(&mut self, key: &[u8]) -> Result<Option<T>> {
658 match self.get(key)? {
659 Some(bytes) => {
660 let val: T = postcard::from_bytes(&bytes).map_err(|_| FabricError::IoError(-1))?;
661 Ok(Some(val))
662 }
663 None => Ok(None),
664 }
665 }
666
667 #[cfg(feature = "persistence")]
671 pub fn compact_cold(&mut self) -> Result<()> {
672 if let Some(ref mut cold) = self.cold {
673 cold.compact()
674 } else {
675 Ok(())
676 }
677 }
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683 use grafos_std::host;
684
685 fn setup() {
686 host::reset_mock();
687 host::mock_set_fbmu_arena_size(65536);
688 host::mock_set_fbbu_num_blocks(256);
689 }
690
691 #[test]
692 fn put_get_delete_roundtrip() {
693 setup();
694 let mut kv = KvBuilder::new()
695 .hot_buckets(64)
696 .default_ttl_secs(300)
697 .build()
698 .expect("build");
699
700 kv.put(b"key1", b"value1").expect("put");
701 kv.put(b"key2", b"value2").expect("put");
702
703 assert_eq!(kv.get(b"key1").expect("get"), Some(b"value1".to_vec()));
704 assert_eq!(kv.get(b"key2").expect("get"), Some(b"value2".to_vec()));
705 assert_eq!(kv.get(b"key3").expect("get"), None);
706
707 assert!(kv.delete(b"key1").expect("delete"));
708 assert_eq!(kv.get(b"key1").expect("get"), None);
709 assert!(!kv.delete(b"nonexistent").expect("delete"));
710 }
711
712 #[test]
713 fn put_struct_get_struct() {
714 setup();
715 let mut kv = KvBuilder::new().build().expect("build");
716
717 #[derive(Serialize, Deserialize, Debug, PartialEq)]
718 struct MyData {
719 count: u64,
720 name: Vec<u8>,
721 }
722
723 let data = MyData {
724 count: 42,
725 name: b"test".to_vec(),
726 };
727 kv.put_struct(b"structured", &data).expect("put_struct");
728
729 let retrieved: MyData = kv
730 .get_struct(b"structured")
731 .expect("get_struct")
732 .expect("some");
733 assert_eq!(retrieved, data);
734 }
735
736 #[test]
737 fn ttl_expiry() {
738 setup();
739 let mut kv = KvBuilder::new()
740 .default_ttl_secs(10)
741 .build()
742 .expect("build");
743
744 kv.put(b"ephemeral", b"data").expect("put");
745 assert_eq!(kv.get(b"ephemeral").expect("get"), Some(b"data".to_vec()));
746 assert!(kv.ttl(b"ephemeral").expect("ttl").is_some());
747
748 host::mock_advance_time_secs(11);
750
751 assert_eq!(kv.get(b"ephemeral").expect("get"), None);
752 assert_eq!(kv.ttl(b"ephemeral").expect("ttl"), None);
753 }
754
755 #[test]
756 fn ttl_renewal_on_access() {
757 setup();
758 let mut kv = KvBuilder::new()
759 .default_ttl_secs(10)
760 .build()
761 .expect("build");
762
763 kv.put(b"renew-me", b"data").expect("put");
764
765 host::mock_advance_time_secs(8);
767
768 let val = kv.get(b"renew-me").expect("get");
770 assert_eq!(val, Some(b"data".to_vec()));
771
772 host::mock_advance_time_secs(8);
774
775 let val = kv.get(b"renew-me").expect("get");
777 assert_eq!(val, Some(b"data".to_vec()));
778 }
779
780 #[test]
781 fn exists_and_keys() {
782 setup();
783 let mut kv = KvBuilder::new()
784 .default_ttl_secs(300)
785 .build()
786 .expect("build");
787
788 kv.put(b"a", b"1").expect("put");
789 kv.put(b"b", b"2").expect("put");
790 kv.put(b"c", b"3").expect("put");
791
792 assert!(kv.exists(b"a").expect("exists"));
793 assert!(kv.exists(b"b").expect("exists"));
794 assert!(!kv.exists(b"nonexistent").expect("exists"));
795
796 let mut keys = kv.keys().expect("keys");
797 keys.sort();
798 assert_eq!(keys, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
799 }
800
801 #[test]
802 fn tick_evicts_expired() {
803 setup();
804 let mut kv = KvBuilder::new().default_ttl_secs(5).build().expect("build");
805
806 kv.put(b"short", b"val").expect("put");
807 kv.put_with_ttl(b"long", b"val", 100).expect("put");
808
809 host::mock_advance_time_secs(6);
810
811 let evicted = kv.tick().expect("tick");
812 assert_eq!(evicted, 1);
813
814 assert_eq!(kv.get(b"short").expect("get"), None);
815 assert_eq!(kv.get(b"long").expect("get"), Some(b"val".to_vec()));
816 }
817
818 #[test]
819 fn keys_excludes_expired() {
820 setup();
821 let mut kv = KvBuilder::new().default_ttl_secs(5).build().expect("build");
822
823 kv.put(b"expire-soon", b"val").expect("put");
824 kv.put_with_ttl(b"lives-long", b"val", 100).expect("put");
825
826 host::mock_advance_time_secs(6);
827
828 let keys = kv.keys().expect("keys");
829 assert_eq!(keys, vec![b"lives-long".to_vec()]);
830 }
831
832 #[test]
833 fn put_with_explicit_ttl() {
834 setup();
835 let mut kv = KvBuilder::new()
836 .default_ttl_secs(300)
837 .build()
838 .expect("build");
839
840 kv.put_with_ttl(b"short", b"val", 5).expect("put");
841 kv.put_with_ttl(b"long", b"val", 600).expect("put");
842
843 host::mock_advance_time_secs(6);
845
846 assert_eq!(kv.get(b"short").expect("get"), None);
847 assert_eq!(kv.get(b"long").expect("get"), Some(b"val".to_vec()));
848 }
849
850 #[test]
851 fn ttl_remaining_decreases() {
852 setup();
853 let mut kv = KvBuilder::new()
854 .default_ttl_secs(100)
855 .build()
856 .expect("build");
857
858 kv.put(b"k", b"v").expect("put");
859
860 let ttl1 = kv.ttl(b"k").expect("ttl").expect("some");
861 assert_eq!(ttl1, 100);
862
863 host::mock_advance_time_secs(30);
864 let ttl2 = kv.ttl(b"k").expect("ttl").expect("some");
865 assert_eq!(ttl2, 70);
866
867 host::mock_advance_time_secs(71);
868 assert_eq!(kv.ttl(b"k").expect("ttl"), None);
870 }
871
872 #[test]
873 fn tick_mixed_ttls_evicts_selectively() {
874 setup();
875 let mut kv = KvBuilder::new()
876 .default_ttl_secs(100)
877 .build()
878 .expect("build");
879
880 kv.put_with_ttl(b"a", b"1", 5).expect("put");
881 kv.put_with_ttl(b"b", b"2", 20).expect("put");
882 kv.put_with_ttl(b"c", b"3", 100).expect("put");
883
884 host::mock_advance_time_secs(6);
886 let evicted = kv.tick().expect("tick");
887 assert_eq!(evicted, 1);
888 assert_eq!(kv.keys().expect("keys").len(), 2);
890
891 host::mock_advance_time_secs(15);
894 let evicted = kv.tick().expect("tick");
895 assert_eq!(evicted, 1);
896 let keys = kv.keys().expect("keys");
898 assert_eq!(keys.len(), 1);
899 assert_eq!(keys[0], b"c".to_vec());
900 }
901
902 #[test]
903 fn tick_on_empty_store() {
904 setup();
905 let mut kv = KvBuilder::new().build().expect("build");
906 let evicted = kv.tick().expect("tick");
907 assert_eq!(evicted, 0);
908 }
909
910 #[test]
911 fn overwrite_resets_ttl() {
912 setup();
913 let mut kv = KvBuilder::new()
914 .default_ttl_secs(10)
915 .build()
916 .expect("build");
917
918 kv.put(b"k", b"v1").expect("put");
919 host::mock_advance_time_secs(8);
920
921 kv.put(b"k", b"v2").expect("put");
923
924 host::mock_advance_time_secs(5);
927 assert_eq!(kv.get(b"k").expect("get"), Some(b"v2".to_vec()));
928 }
929
930 #[test]
931 fn delete_nonexistent_returns_false() {
932 setup();
933 let mut kv = KvBuilder::new().build().expect("build");
934 assert!(!kv.delete(b"nope").expect("delete"));
935 }
936
937 #[test]
938 fn exists_returns_false_for_expired() {
939 setup();
940 let mut kv = KvBuilder::new().default_ttl_secs(5).build().expect("build");
941
942 kv.put(b"k", b"v").expect("put");
943 assert!(kv.exists(b"k").expect("exists"));
944
945 host::mock_advance_time_secs(6);
946 assert!(!kv.exists(b"k").expect("exists after expiry"));
947 }
948
949 #[test]
950 fn builder_custom_strides() {
951 setup();
952 let mut kv = KvBuilder::new()
954 .hot_buckets(16)
955 .key_stride(64)
956 .value_stride(256)
957 .default_ttl_secs(60)
958 .build()
959 .expect("build");
960
961 kv.put(b"key", b"val").expect("put");
962 assert_eq!(kv.get(b"key").expect("get"), Some(b"val".to_vec()));
963 }
964
965 #[test]
966 fn get_on_expired_entry_removes_it_from_hot() {
967 setup();
968 let mut kv = KvBuilder::new().default_ttl_secs(5).build().expect("build");
969
970 kv.put(b"k", b"v").expect("put");
971 assert_eq!(kv.keys().expect("keys").len(), 1);
972
973 host::mock_advance_time_secs(6);
974
975 assert_eq!(kv.get(b"k").expect("get"), None);
977
978 let evicted = kv.tick().expect("tick");
980 assert_eq!(evicted, 0);
981 }
982
983 #[cfg(feature = "persistence")]
984 mod cold_tier_tests {
985 use super::*;
986
987 fn setup_with_cold() {
988 host::reset_mock();
989 host::mock_set_fbmu_arena_size(65536);
990 host::mock_set_fbbu_num_blocks(256);
991 }
992
993 fn setup_small_hot() {
999 host::reset_mock();
1000 host::mock_set_fbmu_arena_size(2660);
1001 host::mock_set_fbbu_num_blocks(256);
1002 }
1003
1004 #[test]
1005 fn cold_tier_spill_and_promote() {
1006 setup_small_hot();
1007
1008 let mut kv = KvBuilder::new()
1009 .hot_buckets(4)
1010 .default_ttl_secs(300)
1011 .cold_num_blocks(64)
1012 .build()
1013 .expect("build");
1014
1015 kv.put(b"a", b"1").expect("put a");
1017 kv.put(b"b", b"2").expect("put b");
1018 kv.put(b"c", b"3").expect("put c");
1019
1020 kv.put(b"d", b"4").expect("put d (cold spill)");
1022 assert!(
1023 kv.cold.as_ref().unwrap().write_cursor > 0,
1024 "d should have spilled to cold tier"
1025 );
1026
1027 let val = kv.get(b"d").expect("get d");
1029 assert_eq!(val, Some(b"4".to_vec()));
1030 }
1031
1032 #[test]
1033 fn cold_tier_compaction() {
1034 setup_small_hot();
1035 let mut kv = KvBuilder::new()
1036 .hot_buckets(4)
1037 .default_ttl_secs(300)
1038 .cold_num_blocks(64)
1039 .build()
1040 .expect("build");
1041
1042 kv.put(b"a", b"1").expect("put");
1044 kv.put(b"b", b"2").expect("put");
1045 kv.put(b"c", b"3").expect("put");
1046
1047 kv.put(b"d", b"4").expect("put d");
1049 kv.put(b"e", b"5").expect("put e");
1050
1051 kv.delete(b"d").expect("delete d");
1053
1054 kv.compact_cold().expect("compact");
1056
1057 let val = kv.get(b"e").expect("get e");
1059 assert_eq!(val, Some(b"5".to_vec()));
1060
1061 let val = kv.get(b"d").expect("get d");
1063 assert_eq!(val, None);
1064 }
1065
1066 #[test]
1067 fn cold_tier_multiple_writes_last_writer_wins() {
1068 setup_small_hot();
1069 let mut kv = KvBuilder::new()
1070 .hot_buckets(4)
1071 .default_ttl_secs(300)
1072 .cold_num_blocks(64)
1073 .build()
1074 .expect("build");
1075
1076 kv.put(b"a", b"1").expect("put a");
1078 kv.put(b"b", b"2").expect("put b");
1079 kv.put(b"c", b"3").expect("put c");
1080
1081 kv.put(b"d", b"first").expect("put d first");
1083 kv.put(b"d", b"second").expect("put d second");
1084
1085 let val = kv.get(b"d").expect("get d");
1087 assert_eq!(val, Some(b"second".to_vec()));
1088 }
1089
1090 #[test]
1091 fn cold_tier_promote_to_hot_on_read() {
1092 setup_small_hot();
1093 let mut kv = KvBuilder::new()
1094 .hot_buckets(4)
1095 .default_ttl_secs(300)
1096 .cold_num_blocks(64)
1097 .build()
1098 .expect("build");
1099
1100 kv.put(b"a", b"1").expect("put a");
1102 kv.put(b"b", b"2").expect("put b");
1103 kv.put(b"c", b"3").expect("put c");
1104
1105 kv.put(b"d", b"4").expect("put d");
1107
1108 kv.delete(b"a").expect("delete a");
1110
1111 let val = kv.get(b"d").expect("get d");
1113 assert_eq!(val, Some(b"4".to_vec()));
1114
1115 let val2 = kv.get(b"d").expect("get d again");
1118 assert_eq!(val2, Some(b"4".to_vec()));
1119 }
1120
1121 #[test]
1122 fn cold_tier_tombstone_prevents_read() {
1123 setup_small_hot();
1124 let mut kv = KvBuilder::new()
1125 .hot_buckets(4)
1126 .default_ttl_secs(300)
1127 .cold_num_blocks(64)
1128 .build()
1129 .expect("build");
1130
1131 kv.put(b"a", b"1").expect("put a");
1133 kv.put(b"b", b"2").expect("put b");
1134 kv.put(b"c", b"3").expect("put c");
1135
1136 kv.put(b"d", b"4").expect("put d");
1138
1139 kv.delete(b"d").expect("delete d");
1141
1142 let val = kv.get(b"d").expect("get d");
1144 assert_eq!(val, None);
1145 }
1146
1147 #[test]
1148 fn compact_cold_removes_duplicates() {
1149 setup_small_hot();
1150 let mut kv = KvBuilder::new()
1151 .hot_buckets(4)
1152 .default_ttl_secs(300)
1153 .cold_num_blocks(64)
1154 .build()
1155 .expect("build");
1156
1157 kv.put(b"a", b"1").expect("put a");
1159 kv.put(b"b", b"2").expect("put b");
1160 kv.put(b"c", b"3").expect("put c");
1161
1162 kv.put(b"d", b"v1").expect("put d v1");
1164 kv.put(b"d", b"v2").expect("put d v2");
1165 kv.put(b"d", b"v3").expect("put d v3");
1166 kv.put(b"e", b"x1").expect("put e x1");
1167 kv.put(b"e", b"x2").expect("put e x2");
1168
1169 kv.compact_cold().expect("compact");
1171
1172 let val_d = kv.get(b"d").expect("get d");
1174 assert_eq!(val_d, Some(b"v3".to_vec()));
1175 let val_e = kv.get(b"e").expect("get e");
1176 assert_eq!(val_e, Some(b"x2".to_vec()));
1177 }
1178
1179 #[test]
1180 fn hot_full_without_cold_returns_capacity_exceeded() {
1181 host::reset_mock();
1182 host::mock_set_fbmu_arena_size(2660);
1186 host::mock_set_fbbu_num_blocks(256);
1187
1188 let mut kv = KvBuilder::new()
1190 .hot_buckets(4)
1191 .default_ttl_secs(300)
1192 .build()
1193 .expect("build");
1194
1195 kv.put(b"a", b"1").expect("put a");
1197 kv.put(b"b", b"2").expect("put b");
1198 kv.put(b"c", b"3").expect("put c");
1199
1200 let err = kv.put(b"d", b"4").unwrap_err();
1202 assert_eq!(err, FabricError::CapacityExceeded);
1203 }
1204
1205 #[test]
1206 fn compact_cold_on_store_without_cold_is_noop() {
1207 setup_with_cold();
1208 let mut kv = KvBuilder::new()
1209 .hot_buckets(64)
1210 .default_ttl_secs(300)
1211 .build()
1212 .expect("build");
1213
1214 kv.compact_cold().expect("compact on no cold");
1216 }
1217
1218 #[test]
1219 fn from_cold_recovers_spilled_keys() {
1220 host::reset_mock();
1221 host::mock_set_fbbu_num_blocks(256);
1222
1223 host::mock_set_fbmu_arena_size(2660);
1227
1228 let mut kv = KvBuilder::new()
1230 .hot_buckets(4)
1231 .default_ttl_secs(300)
1232 .cold_num_blocks(64)
1233 .build()
1234 .expect("build");
1235
1236 kv.put(b"a", b"1").expect("put a");
1238 kv.put(b"b", b"2").expect("put b");
1239 kv.put(b"c", b"3").expect("put c");
1240
1241 kv.put(b"cold-key", b"cold-value").expect("put cold");
1243 assert!(
1244 kv.cold.as_ref().unwrap().write_cursor > 0,
1245 "cold-key should have spilled to cold tier"
1246 );
1247
1248 let cold_lease_id = kv.cold.as_ref().unwrap().lease.lease_id();
1250 core::mem::forget(kv);
1251
1252 host::mock_set_fbmu_arena_size(65536);
1254
1255 let cold_lease =
1257 grafos_std::block::BlockBuilder::attach(cold_lease_id).expect("attach cold");
1258 let mut recovered = KvBuilder::from_cold(cold_lease, 64).expect("from_cold");
1259
1260 let val = recovered.get(b"cold-key").expect("get cold-key");
1262 assert_eq!(val, Some(b"cold-value".to_vec()));
1263 }
1264
1265 #[test]
1266 fn from_leases_recovers_hot_and_cold() {
1267 setup_with_cold();
1268
1269 let mut kv = KvBuilder::new()
1270 .hot_buckets(64)
1271 .default_ttl_secs(300)
1272 .cold_num_blocks(64)
1273 .build()
1274 .expect("build");
1275
1276 kv.put(b"hot-key", b"hot-value").expect("put hot");
1277
1278 let hot_lease_id = kv.hot.lease_id();
1279 let cold_lease_id = kv.cold.as_ref().unwrap().lease.lease_id();
1280 core::mem::forget(kv);
1281
1282 let hot_lease = grafos_std::mem::MemBuilder::attach(hot_lease_id).expect("attach hot");
1284 let cold_lease =
1285 grafos_std::block::BlockBuilder::attach(cold_lease_id).expect("attach cold");
1286 let mut recovered = KvBuilder::from_leases(hot_lease, cold_lease).expect("from_leases");
1287
1288 let val = recovered.get(b"hot-key").expect("get hot-key");
1290 assert_eq!(val, Some(b"hot-value".to_vec()));
1291 }
1292 }
1293}