grafos_store/
tiered_store.rs

1//! Tiered object store: hot (memory) + cold (block) with LRU eviction.
2
3extern crate alloc;
4use alloc::collections::VecDeque;
5use alloc::string::String;
6use alloc::vec::Vec;
7
8use grafos_std::error::Result;
9
10use crate::block_store::BlockObjectStore;
11use crate::mem_store::MemObjectStore;
12#[cfg(feature = "versioning")]
13use crate::meta::VersionInfo;
14use crate::meta::{ObjectInfo, PutOptions};
15use crate::store::{ObjectData, ObjectStore};
16use crate::uri::FabricUri;
17
18/// Tiered object store composing a hot (memory) and cold (block) tier.
19///
20/// New objects are written to the hot tier. When the hot tier is full,
21/// the least recently used object is demoted to the cold tier. On a
22/// hot-tier miss, the cold tier is consulted and matching objects are
23/// promoted back to the hot tier.
24///
25/// # Example
26///
27/// ```rust
28/// use grafos_store::{TieredObjectStore, ObjectStore, FabricUri};
29///
30/// # grafos_std::host::reset_mock();
31/// # grafos_std::host::mock_set_fbmu_arena_size(131072);
32/// # grafos_std::host::mock_set_fbbu_num_blocks(512);
33/// let mut store = TieredObjectStore::new(32, 256, 16)?;
34/// let uri: FabricUri = "fabric://p/b/k".parse()?;
35/// store.put(&uri, b"data", None)?;
36/// let obj = store.get(&uri)?.unwrap();
37/// assert_eq!(obj.data, b"data");
38/// # Ok::<(), grafos_std::FabricError>(())
39/// ```
40pub struct TieredObjectStore {
41    hot: MemObjectStore,
42    cold: BlockObjectStore,
43    /// LRU tracking: front = least recently used, back = most recently used.
44    lru: VecDeque<String>,
45    /// Maximum number of objects in the hot tier before eviction.
46    max_hot: usize,
47}
48
49impl TieredObjectStore {
50    /// Create a tiered store.
51    ///
52    /// - `hot_buckets`: number of hash map buckets for the hot tier
53    /// - `cold_blocks`: number of blocks for the cold tier
54    /// - `max_hot`: maximum objects in hot tier before LRU eviction
55    pub fn new(hot_buckets: usize, cold_blocks: u64, max_hot: usize) -> Result<Self> {
56        let hot = MemObjectStore::new(hot_buckets)?;
57        let cold = BlockObjectStore::new(cold_blocks)?;
58        Ok(TieredObjectStore {
59            hot,
60            cold,
61            lru: VecDeque::new(),
62            max_hot,
63        })
64    }
65
66    /// Touch a URI in the LRU list (move to back = most recently used).
67    fn touch_lru(&mut self, uri_str: &str) {
68        if let Some(pos) = self.lru.iter().position(|s| s == uri_str) {
69            self.lru.remove(pos);
70        }
71        self.lru.push_back(String::from(uri_str));
72    }
73
74    /// Remove a URI from the LRU list.
75    fn remove_lru(&mut self, uri_str: &str) {
76        if let Some(pos) = self.lru.iter().position(|s| s == uri_str) {
77            self.lru.remove(pos);
78        }
79    }
80
81    /// Evict the least recently used object from hot to cold tier.
82    fn evict_lru(&mut self) -> Result<()> {
83        if let Some(victim_str) = self.lru.pop_front() {
84            if let Ok(victim_uri) = victim_str.parse::<FabricUri>() {
85                // Read from hot tier
86                if let Some(obj) = self.hot.get(&victim_uri)? {
87                    // Write to cold tier
88                    let opts = PutOptions {
89                        content_type: Some(obj.info.content_type.clone()),
90                        ..Default::default()
91                    };
92                    self.cold.put(&victim_uri, &obj.data, Some(opts))?;
93                }
94                // Remove from hot tier
95                let _ = self.hot.delete(&victim_uri);
96            }
97        }
98        Ok(())
99    }
100
101    /// Checkpoint the cold tier index to block storage.
102    pub fn checkpoint(&self) -> Result<()> {
103        self.cold.checkpoint_index()
104    }
105}
106
107impl ObjectStore for TieredObjectStore {
108    fn put(&mut self, uri: &FabricUri, data: &[u8], opts: Option<PutOptions>) -> Result<()> {
109        let uri_str = uri.to_string();
110
111        // Evict from hot tier if at capacity (and this is a new key)
112        if self.hot.head(uri)?.is_none() && self.lru.len() >= self.max_hot {
113            self.evict_lru()?;
114        }
115
116        // Write to hot tier
117        self.hot.put(uri, data, opts)?;
118        self.touch_lru(&uri_str);
119
120        // Also remove from cold if it was promoted earlier (avoid stale copies)
121        let _ = self.cold.delete(uri);
122
123        Ok(())
124    }
125
126    fn get(&self, uri: &FabricUri) -> Result<Option<ObjectData>> {
127        // Check hot tier first
128        if let Some(obj) = self.hot.get(uri)? {
129            // LRU touch requires &mut self; we handle this in the mutable wrapper below
130            return Ok(Some(obj));
131        }
132
133        // Check cold tier
134        self.cold.get(uri)
135    }
136
137    fn head(&self, uri: &FabricUri) -> Result<Option<ObjectInfo>> {
138        if let Some(info) = self.hot.head(uri)? {
139            return Ok(Some(info));
140        }
141        self.cold.head(uri)
142    }
143
144    fn delete(&mut self, uri: &FabricUri) -> Result<bool> {
145        let uri_str = uri.to_string();
146        self.remove_lru(&uri_str);
147        let hot_deleted = self.hot.delete(uri)?;
148        let cold_deleted = self.cold.delete(uri)?;
149        Ok(hot_deleted || cold_deleted)
150    }
151
152    fn list(&self, pool: &str, bucket: &str, prefix: &str) -> Result<Vec<String>> {
153        let mut hot_keys = self.hot.list(pool, bucket, prefix)?;
154        let cold_keys = self.cold.list(pool, bucket, prefix)?;
155
156        // Merge, dedup
157        for k in cold_keys {
158            if !hot_keys.contains(&k) {
159                hot_keys.push(k);
160            }
161        }
162        hot_keys.sort();
163        Ok(hot_keys)
164    }
165
166    #[cfg(feature = "versioning")]
167    fn get_version(&self, uri: &FabricUri, version: u64) -> Result<Option<ObjectData>> {
168        // Versions are stored in the hot tier (where puts go first)
169        if let Some(obj) = self.hot.get_version(uri, version)? {
170            return Ok(Some(obj));
171        }
172        self.cold.get_version(uri, version)
173    }
174
175    #[cfg(feature = "versioning")]
176    fn list_versions(&self, pool: &str, bucket: &str, key: &str) -> Result<Vec<VersionInfo>> {
177        let mut hot = self.hot.list_versions(pool, bucket, key)?;
178        let cold = self.cold.list_versions(pool, bucket, key)?;
179        // Merge and dedup by version number
180        for v in cold {
181            if !hot.iter().any(|h| h.version == v.version) {
182                hot.push(v);
183            }
184        }
185        hot.sort_by_key(|v| v.version);
186        Ok(hot)
187    }
188
189    #[cfg(feature = "versioning")]
190    fn delete_version(&mut self, uri: &FabricUri, version: u64) -> Result<bool> {
191        let hot_deleted = self.hot.delete_version(uri, version)?;
192        let cold_deleted = self.cold.delete_version(uri, version)?;
193        Ok(hot_deleted || cold_deleted)
194    }
195}
196
197/// Mutable get with LRU tracking and cold-to-hot promotion.
198impl TieredObjectStore {
199    /// Get an object, promoting from cold tier to hot tier on a miss.
200    ///
201    /// This is the preferred get method as it maintains LRU state and
202    /// promotes cold objects to the hot tier for faster subsequent access.
203    pub fn get_mut(&mut self, uri: &FabricUri) -> Result<Option<ObjectData>> {
204        let uri_str = uri.to_string();
205
206        // Check hot tier first
207        if let Some(obj) = self.hot.get(uri)? {
208            self.touch_lru(&uri_str);
209            return Ok(Some(obj));
210        }
211
212        // Check cold tier and promote on hit
213        if let Some(obj) = self.cold.get(uri)? {
214            // Promote to hot tier
215            if self.lru.len() >= self.max_hot {
216                self.evict_lru()?;
217            }
218            let opts = PutOptions {
219                content_type: Some(obj.info.content_type.clone()),
220                ..Default::default()
221            };
222            self.hot.put(uri, &obj.data, Some(opts))?;
223            self.touch_lru(&uri_str);
224            return Ok(Some(obj));
225        }
226
227        Ok(None)
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use grafos_std::host;
235
236    fn setup() {
237        host::reset_mock();
238        host::mock_set_fbmu_arena_size(1_048_576);
239        host::mock_set_fbbu_num_blocks(1024);
240    }
241
242    #[test]
243    fn hot_tier_put_get() {
244        setup();
245        let mut store = TieredObjectStore::new(64, 256, 16).unwrap();
246        let uri: FabricUri = "fabric://p/b/hot".parse().unwrap();
247
248        store.put(&uri, b"fast", None).unwrap();
249        let obj = store.get_mut(&uri).unwrap().unwrap();
250        assert_eq!(obj.data, b"fast");
251    }
252
253    #[test]
254    fn lru_eviction_to_cold() {
255        setup();
256        // max_hot = 2 to force eviction quickly
257        let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
258
259        let uri1: FabricUri = "fabric://p/b/obj1".parse().unwrap();
260        let uri2: FabricUri = "fabric://p/b/obj2".parse().unwrap();
261        let uri3: FabricUri = "fabric://p/b/obj3".parse().unwrap();
262
263        store.put(&uri1, b"one", None).unwrap();
264        store.put(&uri2, b"two", None).unwrap();
265        // This should evict uri1 (LRU) to cold tier
266        store.put(&uri3, b"three", None).unwrap();
267
268        // uri1 should still be accessible via cold tier
269        let obj = store.get_mut(&uri1).unwrap().unwrap();
270        assert_eq!(obj.data, b"one");
271
272        // uri3 should be in hot tier
273        let obj3 = store.get_mut(&uri3).unwrap().unwrap();
274        assert_eq!(obj3.data, b"three");
275    }
276
277    #[test]
278    fn cold_promotion_on_read() {
279        setup();
280        let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
281
282        let uri1: FabricUri = "fabric://p/b/a".parse().unwrap();
283        let uri2: FabricUri = "fabric://p/b/b".parse().unwrap();
284        let uri3: FabricUri = "fabric://p/b/c".parse().unwrap();
285
286        store.put(&uri1, b"1", None).unwrap();
287        store.put(&uri2, b"2", None).unwrap();
288        store.put(&uri3, b"3", None).unwrap(); // evicts uri1 to cold
289
290        // Read uri1 — should promote back to hot
291        let obj = store.get_mut(&uri1).unwrap().unwrap();
292        assert_eq!(obj.data, b"1");
293    }
294
295    #[test]
296    fn delete_from_both_tiers() {
297        setup();
298        let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
299
300        let uri1: FabricUri = "fabric://p/b/d1".parse().unwrap();
301        let uri2: FabricUri = "fabric://p/b/d2".parse().unwrap();
302        let uri3: FabricUri = "fabric://p/b/d3".parse().unwrap();
303
304        store.put(&uri1, b"1", None).unwrap();
305        store.put(&uri2, b"2", None).unwrap();
306        store.put(&uri3, b"3", None).unwrap(); // evicts uri1 to cold
307
308        // Delete uri1 (in cold) and uri3 (in hot)
309        assert!(store.delete(&uri1).unwrap());
310        assert!(store.delete(&uri3).unwrap());
311
312        assert!(store.get_mut(&uri1).unwrap().is_none());
313        assert!(store.get_mut(&uri3).unwrap().is_none());
314    }
315
316    #[test]
317    fn list_merges_tiers() {
318        setup();
319        let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
320
321        let uri1: FabricUri = "fabric://p/b/x1".parse().unwrap();
322        let uri2: FabricUri = "fabric://p/b/x2".parse().unwrap();
323        let uri3: FabricUri = "fabric://p/b/x3".parse().unwrap();
324
325        store.put(&uri1, b"1", None).unwrap();
326        store.put(&uri2, b"2", None).unwrap();
327        store.put(&uri3, b"3", None).unwrap(); // evicts uri1 to cold
328
329        let keys = store.list("p", "b", "x").unwrap();
330        assert_eq!(keys.len(), 3);
331    }
332
333    #[test]
334    fn head_checks_both_tiers() {
335        setup();
336        let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
337
338        let uri1: FabricUri = "fabric://p/b/h1".parse().unwrap();
339        let uri2: FabricUri = "fabric://p/b/h2".parse().unwrap();
340        let uri3: FabricUri = "fabric://p/b/h3".parse().unwrap();
341
342        store.put(&uri1, b"1", None).unwrap();
343        store.put(&uri2, b"2", None).unwrap();
344        store.put(&uri3, b"3", None).unwrap();
345
346        // uri1 is in cold tier, should still be visible via head
347        let info = store.head(&uri1).unwrap().unwrap();
348        assert_eq!(info.size, 1);
349    }
350}