grafos_store/
block_store.rs

1//! Durable object store backed by block storage.
2//!
3//! Layout:
4//! - Block 0: Superblock (magic, version, object count, next data LBA)
5//! - Blocks 1..N: Object data records (append-only)
6//! - In-memory index maps URIs to (LBA, len) pairs
7//! - Index checkpoint: serialized to blocks starting at a reserved LBA range
8
9extern crate alloc;
10use alloc::string::String;
11use alloc::vec;
12use alloc::vec::Vec;
13
14use grafos_std::block::{BlockBuilder, BlockLease, BLOCK_SIZE};
15use grafos_std::error::{FabricError, Result};
16use grafos_std::host;
17
18use serde::{Deserialize, Serialize};
19
20use crate::crc::crc32;
21#[cfg(feature = "versioning")]
22use crate::meta::VersionInfo;
23use crate::meta::{ObjectInfo, ObjectMeta, PutOptions};
24use crate::store::{ObjectData, ObjectStore};
25use crate::uri::FabricUri;
26
27const SUPER_MAGIC: [u8; 4] = *b"GOST"; // GrafOS STore
28const SUPER_VERSION: u32 = 1;
29
30/// On-disk record header prepended to each object data record.
31///
32/// ```text
33/// uri_len: u32 (4) | uri: [u8] | meta_len: u32 (4) | meta: [u8] | data_len: u32 (4) | data: [u8]
34/// ```
35#[derive(Clone, Debug, Serialize, Deserialize)]
36struct IndexEntry {
37    uri: String,
38    lba: u64,
39    total_bytes: u64,
40    meta: ObjectMeta,
41}
42
43/// Durable object store backed by leased block storage.
44///
45/// Objects are appended sequentially starting after the superblock. An
46/// in-memory index maps URIs to their block locations. The index is
47/// checkpointed to reserved blocks at the end of the device.
48///
49/// Deleted objects are tombstoned in the index but their blocks are not
50/// reclaimed (append-only).
51pub struct BlockObjectStore {
52    lease: BlockLease,
53    /// Next free byte offset for data writes (in bytes, not blocks).
54    write_cursor: u64,
55    /// In-memory index: URI string -> IndexEntry.
56    index: Vec<IndexEntry>,
57    /// Block range reserved for index checkpoint: [index_start_lba, num_blocks).
58    index_start_lba: u64,
59}
60
61impl BlockObjectStore {
62    /// Create a new block object store with the given number of blocks.
63    ///
64    /// Reserves the last 10% of blocks (minimum 4) for index checkpoints.
65    pub fn new(num_blocks: u64) -> Result<Self> {
66        let lease = BlockBuilder::new().min_blocks(num_blocks).acquire()?;
67        let total_blocks = lease.block().num_blocks();
68
69        // Reserve last 10% (min 4 blocks) for index
70        let index_blocks = core::cmp::max(4, total_blocks / 10);
71        let index_start_lba = total_blocks - index_blocks;
72
73        // Write superblock
74        let mut sb = [0u8; BLOCK_SIZE];
75        sb[0..4].copy_from_slice(&SUPER_MAGIC);
76        sb[4..8].copy_from_slice(&SUPER_VERSION.to_le_bytes());
77        sb[8..16].copy_from_slice(&0u64.to_le_bytes()); // object count
78                                                        // Data starts at byte offset = BLOCK_SIZE (after superblock)
79        let data_start = BLOCK_SIZE as u64;
80        sb[16..24].copy_from_slice(&data_start.to_le_bytes()); // next data offset
81        lease.block().write_block(0, &sb)?;
82
83        Ok(BlockObjectStore {
84            lease,
85            write_cursor: data_start,
86            index: Vec::new(),
87            index_start_lba,
88        })
89    }
90
91    /// Maximum data byte offset (beginning of index region).
92    fn max_data_offset(&self) -> u64 {
93        self.index_start_lba * BLOCK_SIZE as u64
94    }
95
96    /// Write raw bytes at the current cursor position across block boundaries.
97    fn write_raw(&mut self, data: &[u8]) -> Result<u64> {
98        let start = self.write_cursor;
99        if start + data.len() as u64 > self.max_data_offset() {
100            return Err(FabricError::CapacityExceeded);
101        }
102
103        let mut offset = start;
104        let mut remaining = data;
105        while !remaining.is_empty() {
106            let block_idx = offset / BLOCK_SIZE as u64;
107            let block_offset = (offset % BLOCK_SIZE as u64) as usize;
108            let space = BLOCK_SIZE - block_offset;
109            let chunk_len = remaining.len().min(space);
110
111            let mut block = if block_offset == 0 && chunk_len == BLOCK_SIZE {
112                [0u8; BLOCK_SIZE]
113            } else {
114                self.lease.block().read_block(block_idx)?
115            };
116            block[block_offset..block_offset + chunk_len].copy_from_slice(&remaining[..chunk_len]);
117            self.lease.block().write_block(block_idx, &block)?;
118
119            remaining = &remaining[chunk_len..];
120            offset += chunk_len as u64;
121        }
122
123        self.write_cursor = offset;
124        Ok(start)
125    }
126
127    /// Read raw bytes from a given offset.
128    fn read_raw(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
129        let mut buf = vec![0u8; len];
130        let mut buf_offset = 0;
131        let mut disk_offset = offset;
132
133        while buf_offset < len {
134            let block_idx = disk_offset / BLOCK_SIZE as u64;
135            let block_off = (disk_offset % BLOCK_SIZE as u64) as usize;
136            let space = BLOCK_SIZE - block_off;
137            let chunk_len = (len - buf_offset).min(space);
138
139            let block = self.lease.block().read_block(block_idx)?;
140            buf[buf_offset..buf_offset + chunk_len]
141                .copy_from_slice(&block[block_off..block_off + chunk_len]);
142
143            buf_offset += chunk_len;
144            disk_offset += chunk_len as u64;
145        }
146
147        Ok(buf)
148    }
149
150    /// Update the superblock with the current object count and write cursor.
151    fn update_superblock(&self) -> Result<()> {
152        let mut sb = [0u8; BLOCK_SIZE];
153        sb[0..4].copy_from_slice(&SUPER_MAGIC);
154        sb[4..8].copy_from_slice(&SUPER_VERSION.to_le_bytes());
155        sb[8..16].copy_from_slice(&(self.index.len() as u64).to_le_bytes());
156        sb[16..24].copy_from_slice(&self.write_cursor.to_le_bytes());
157        self.lease.block().write_block(0, &sb)
158    }
159
160    /// Checkpoint the in-memory index to the reserved block region.
161    pub fn checkpoint_index(&self) -> Result<()> {
162        let data = postcard::to_allocvec(&self.index).map_err(|_| FabricError::IoError(-1))?;
163        let data_len = data.len() as u64;
164
165        // Write header at index_start_lba
166        let mut header = [0u8; BLOCK_SIZE];
167        header[0..4].copy_from_slice(b"GIDX"); // GrafOS InDeX
168        header[4..8].copy_from_slice(&1u32.to_le_bytes()); // version
169        header[8..16].copy_from_slice(&data_len.to_le_bytes());
170        self.lease
171            .block()
172            .write_block(self.index_start_lba, &header)?;
173
174        // Write index data blocks
175        let num_data_blocks = data.len().div_ceil(BLOCK_SIZE);
176        let available = self.lease.block().num_blocks() - self.index_start_lba - 1;
177        if num_data_blocks as u64 > available {
178            return Err(FabricError::CapacityExceeded);
179        }
180
181        for i in 0..num_data_blocks {
182            let start = i * BLOCK_SIZE;
183            let end = core::cmp::min(start + BLOCK_SIZE, data.len());
184            let mut block = [0u8; BLOCK_SIZE];
185            block[..end - start].copy_from_slice(&data[start..end]);
186            self.lease
187                .block()
188                .write_block(self.index_start_lba + 1 + i as u64, &block)?;
189        }
190
191        self.update_superblock()
192    }
193
194    fn find_index(&self, uri: &str) -> Option<usize> {
195        self.index.iter().position(|e| e.uri == uri)
196    }
197}
198
199impl ObjectStore for BlockObjectStore {
200    fn put(&mut self, uri: &FabricUri, data: &[u8], opts: Option<PutOptions>) -> Result<()> {
201        let opts = opts.unwrap_or_default();
202        let checksum = crc32(data);
203        let now = host::unix_time_secs();
204
205        #[cfg(feature = "versioning")]
206        let next_version = {
207            let uri_str = uri.to_string();
208            match self.find_index(&uri_str) {
209                Some(idx) => self.index[idx].meta.version + 1,
210                None => 1,
211            }
212        };
213
214        let content_type = opts
215            .content_type
216            .unwrap_or_else(|| String::from("application/octet-stream"));
217
218        // Write data at current cursor
219        let lba = self.write_raw(data)?;
220
221        let meta = ObjectMeta {
222            content_type,
223            crc32: checksum,
224            size: data.len() as u64,
225            created_at: now,
226            tags: opts.tags,
227            #[cfg(feature = "versioning")]
228            version: next_version,
229        };
230
231        let entry = IndexEntry {
232            uri: uri.to_string(),
233            lba,
234            total_bytes: data.len() as u64,
235            meta,
236        };
237
238        // Update or insert in index
239        let uri_str = uri.to_string();
240        if let Some(idx) = self.find_index(&uri_str) {
241            self.index[idx] = entry.clone();
242        } else {
243            self.index.push(entry.clone());
244        }
245
246        // Store a version snapshot in the index
247        #[cfg(feature = "versioning")]
248        {
249            let ver_uri = alloc::format!("{}#v/{}", uri, next_version);
250            let ver_entry = IndexEntry {
251                uri: ver_uri,
252                lba: entry.lba,
253                total_bytes: entry.total_bytes,
254                meta: entry.meta,
255            };
256            self.index.push(ver_entry);
257        }
258
259        Ok(())
260    }
261
262    fn get(&self, uri: &FabricUri) -> Result<Option<ObjectData>> {
263        let uri_str = uri.to_string();
264        let idx = match self.find_index(&uri_str) {
265            Some(i) => i,
266            None => return Ok(None),
267        };
268
269        let entry = &self.index[idx];
270        let data = self.read_raw(entry.lba, entry.total_bytes as usize)?;
271
272        // Verify CRC32
273        let computed = crc32(&data);
274        if computed != entry.meta.crc32 {
275            return Err(FabricError::IoError(-5));
276        }
277
278        let info = ObjectInfo::from_meta(&entry.meta);
279        Ok(Some(ObjectData { data, info }))
280    }
281
282    fn head(&self, uri: &FabricUri) -> Result<Option<ObjectInfo>> {
283        let uri_str = uri.to_string();
284        match self.find_index(&uri_str) {
285            Some(idx) => Ok(Some(ObjectInfo::from_meta(&self.index[idx].meta))),
286            None => Ok(None),
287        }
288    }
289
290    fn delete(&mut self, uri: &FabricUri) -> Result<bool> {
291        let uri_str = uri.to_string();
292        match self.find_index(&uri_str) {
293            Some(idx) => {
294                self.index.remove(idx);
295                Ok(true)
296            }
297            None => Ok(false),
298        }
299    }
300
301    fn list(&self, pool: &str, bucket: &str, prefix: &str) -> Result<Vec<String>> {
302        let mut keys = Vec::new();
303        for entry in &self.index {
304            // Skip version entries
305            if entry.uri.contains("#v/") {
306                continue;
307            }
308            if let Ok(uri) = entry.uri.parse::<FabricUri>() {
309                if uri.pool() == pool
310                    && uri.bucket() == bucket
311                    && (prefix.is_empty() || uri.key().starts_with(prefix))
312                {
313                    keys.push(String::from(uri.key()));
314                }
315            }
316        }
317        keys.sort();
318        Ok(keys)
319    }
320
321    #[cfg(feature = "versioning")]
322    fn get_version(&self, uri: &FabricUri, version: u64) -> Result<Option<ObjectData>> {
323        let ver_uri = alloc::format!("{}#v/{}", uri, version);
324        let idx = match self.find_index(&ver_uri) {
325            Some(i) => i,
326            None => return Ok(None),
327        };
328
329        let entry = &self.index[idx];
330        let data = self.read_raw(entry.lba, entry.total_bytes as usize)?;
331
332        let computed = crc32(&data);
333        if computed != entry.meta.crc32 {
334            return Err(FabricError::IoError(-5));
335        }
336
337        let info = ObjectInfo::from_meta(&entry.meta);
338        Ok(Some(ObjectData { data, info }))
339    }
340
341    #[cfg(feature = "versioning")]
342    fn list_versions(&self, pool: &str, bucket: &str, key: &str) -> Result<Vec<VersionInfo>> {
343        let uri = FabricUri::new(pool, bucket, key)?;
344        let prefix = alloc::format!("{}#v/", uri);
345        let mut versions = Vec::new();
346        for entry in &self.index {
347            if entry.uri.starts_with(&prefix) {
348                versions.push(VersionInfo {
349                    version: entry.meta.version,
350                    size: entry.meta.size,
351                    created_at: entry.meta.created_at,
352                });
353            }
354        }
355        versions.sort_by_key(|v| v.version);
356        Ok(versions)
357    }
358
359    #[cfg(feature = "versioning")]
360    fn delete_version(&mut self, uri: &FabricUri, version: u64) -> Result<bool> {
361        let ver_uri = alloc::format!("{}#v/{}", uri, version);
362        match self.find_index(&ver_uri) {
363            Some(idx) => {
364                self.index.remove(idx);
365                Ok(true)
366            }
367            None => Ok(false),
368        }
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use grafos_std::host;
376
377    fn setup() {
378        host::reset_mock();
379        host::mock_set_fbbu_num_blocks(1024);
380    }
381
382    #[test]
383    fn put_get_roundtrip() {
384        setup();
385        let mut store = BlockObjectStore::new(256).unwrap();
386        let uri: FabricUri = "fabric://p/b/hello".parse().unwrap();
387
388        store.put(&uri, b"world", None).unwrap();
389        let obj = store.get(&uri).unwrap().unwrap();
390        assert_eq!(obj.data, b"world");
391        assert_eq!(obj.info.size, 5);
392    }
393
394    #[test]
395    fn delete_removes_from_index() {
396        setup();
397        let mut store = BlockObjectStore::new(256).unwrap();
398        let uri: FabricUri = "fabric://p/b/del".parse().unwrap();
399
400        store.put(&uri, b"gone", None).unwrap();
401        assert!(store.delete(&uri).unwrap());
402        assert!(store.get(&uri).unwrap().is_none());
403    }
404
405    #[test]
406    fn list_with_prefix() {
407        setup();
408        let mut store = BlockObjectStore::new(256).unwrap();
409
410        store
411            .put(&"fabric://p/b/logs/a".parse().unwrap(), b"1", None)
412            .unwrap();
413        store
414            .put(&"fabric://p/b/logs/b".parse().unwrap(), b"2", None)
415            .unwrap();
416        store
417            .put(&"fabric://p/b/data/c".parse().unwrap(), b"3", None)
418            .unwrap();
419
420        let logs = store.list("p", "b", "logs/").unwrap();
421        assert_eq!(logs, vec!["logs/a", "logs/b"]);
422    }
423
424    #[test]
425    fn checkpoint_preserves_state() {
426        setup();
427        let mut store = BlockObjectStore::new(256).unwrap();
428        let uri: FabricUri = "fabric://p/b/ck".parse().unwrap();
429
430        store.put(&uri, b"persistent", None).unwrap();
431        store.checkpoint_index().unwrap();
432
433        // Verify object is still readable after checkpoint
434        let obj = store.get(&uri).unwrap().unwrap();
435        assert_eq!(obj.data, b"persistent");
436    }
437
438    #[test]
439    fn overwrite_updates() {
440        setup();
441        let mut store = BlockObjectStore::new(256).unwrap();
442        let uri: FabricUri = "fabric://p/b/ow".parse().unwrap();
443
444        store.put(&uri, b"first", None).unwrap();
445        store.put(&uri, b"second", None).unwrap();
446
447        let obj = store.get(&uri).unwrap().unwrap();
448        assert_eq!(obj.data, b"second");
449    }
450
451    #[test]
452    fn large_object_across_blocks() {
453        setup();
454        let mut store = BlockObjectStore::new(256).unwrap();
455        let uri: FabricUri = "fabric://p/b/big".parse().unwrap();
456
457        let data = vec![0xABu8; 2048]; // spans multiple 512-byte blocks
458        store.put(&uri, &data, None).unwrap();
459
460        let obj = store.get(&uri).unwrap().unwrap();
461        assert_eq!(obj.data.len(), 2048);
462        assert_eq!(obj.data, data);
463    }
464}