1extern crate alloc;
4use alloc::string::String;
5use alloc::vec::Vec;
6
7use grafos_collections::map::FabricHashMap;
8use grafos_std::error::{FabricError, Result};
9use grafos_std::host;
10
11use crate::crc::crc32;
12#[cfg(feature = "versioning")]
13use crate::meta::VersionInfo;
14use crate::meta::{ObjectInfo, ObjectMeta, PutOptions};
15use crate::store::{ObjectData, ObjectStore};
16use crate::uri::FabricUri;
17
18const KEY_STRIDE: usize = 256;
20const VALUE_STRIDE: usize = 1536;
23
24#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
26struct StoreEntry {
27 meta: ObjectMeta,
28 data: Vec<u8>,
29}
30
31pub struct MemObjectStore {
53 map: FabricHashMap<String, StoreEntry>,
54}
55
56const CHUNK_SIZE: usize = 1024;
60
61impl MemObjectStore {
62 pub fn new(buckets: usize) -> Result<Self> {
65 let map = FabricHashMap::with_capacity(buckets, KEY_STRIDE, VALUE_STRIDE)?;
66 Ok(MemObjectStore { map })
67 }
68
69 fn uri_key(uri: &FabricUri) -> String {
70 uri.to_string()
71 }
72
73 fn chunk_key(uri: &FabricUri, chunk_idx: usize) -> String {
74 alloc::format!("{}#chunk/{}", uri, chunk_idx)
75 }
76
77 #[cfg(feature = "versioning")]
78 fn version_key(uri: &FabricUri, version: u64) -> String {
79 alloc::format!("{}#v/{}", uri, version)
80 }
81}
82
83impl ObjectStore for MemObjectStore {
84 fn put(&mut self, uri: &FabricUri, data: &[u8], opts: Option<PutOptions>) -> Result<()> {
85 let opts = opts.unwrap_or_default();
86 let checksum = crc32(data);
87 let now = host::unix_time_secs();
88
89 #[cfg(feature = "versioning")]
91 let next_version = {
92 let key = Self::uri_key(uri);
93 match self.map.get(&key)? {
94 Some(existing) => existing.meta.version + 1,
95 None => 1,
96 }
97 };
98
99 let content_type = opts
100 .content_type
101 .unwrap_or_else(|| String::from("application/octet-stream"));
102
103 if data.len() <= CHUNK_SIZE {
104 let meta = ObjectMeta {
105 content_type,
106 crc32: checksum,
107 size: data.len() as u64,
108 created_at: now,
109 tags: opts.tags,
110 #[cfg(feature = "versioning")]
111 version: next_version,
112 };
113 let entry = StoreEntry {
114 meta,
115 data: data.to_vec(),
116 };
117 let key = Self::uri_key(uri);
118 self.map.insert(&key, &entry)?;
119
120 #[cfg(feature = "versioning")]
121 {
122 let vk = Self::version_key(uri, next_version);
123 let _ = self.map.insert(&vk, &entry);
124 }
125 } else {
126 let num_chunks = data.len().div_ceil(CHUNK_SIZE);
128 let meta = ObjectMeta {
129 content_type,
130 crc32: checksum,
131 size: data.len() as u64,
132 created_at: now,
133 tags: opts.tags,
134 #[cfg(feature = "versioning")]
135 version: next_version,
136 };
137 let root = StoreEntry {
139 meta,
140 data: Vec::new(),
141 };
142 let key = Self::uri_key(uri);
143 self.map.insert(&key, &root)?;
144
145 for i in 0..num_chunks {
146 let start = i * CHUNK_SIZE;
147 let end = core::cmp::min(start + CHUNK_SIZE, data.len());
148 let chunk_entry = StoreEntry {
149 meta: ObjectMeta {
150 content_type: String::new(),
151 crc32: 0,
152 size: (end - start) as u64,
153 created_at: now,
154 tags: Vec::new(),
155 #[cfg(feature = "versioning")]
156 version: 0,
157 },
158 data: data[start..end].to_vec(),
159 };
160 let ck = Self::chunk_key(uri, i);
161 self.map.insert(&ck, &chunk_entry)?;
162 }
163
164 #[cfg(feature = "versioning")]
165 {
166 let vk = Self::version_key(uri, next_version);
167 let ver_entry = StoreEntry {
168 meta: root.meta.clone(),
169 data: data.to_vec(),
170 };
171 let _ = self.map.insert(&vk, &ver_entry);
172 }
173 }
174
175 #[cfg(feature = "observe")]
176 crate::observe_hooks::on_object_put(uri.bucket(), uri.key(), data.len() as u64);
177
178 Ok(())
179 }
180
181 fn get(&self, uri: &FabricUri) -> Result<Option<ObjectData>> {
182 let key = Self::uri_key(uri);
183 let entry = match self.map.get(&key)? {
184 Some(e) => e,
185 None => return Ok(None),
186 };
187
188 let data = if entry.data.is_empty() && entry.meta.size > 0 {
189 let num_chunks = (entry.meta.size as usize).div_ceil(CHUNK_SIZE);
191 let mut assembled = Vec::with_capacity(entry.meta.size as usize);
192 for i in 0..num_chunks {
193 let ck = Self::chunk_key(uri, i);
194 let chunk = self.map.get(&ck)?.ok_or(FabricError::IoError(-4))?;
195 assembled.extend_from_slice(&chunk.data);
196 }
197 assembled
198 } else {
199 entry.data.clone()
200 };
201
202 let computed = crc32(&data);
204 if computed != entry.meta.crc32 {
205 return Err(FabricError::IoError(-5));
206 }
207
208 let info = ObjectInfo::from_meta(&entry.meta);
209
210 #[cfg(feature = "observe")]
211 crate::observe_hooks::on_object_get(uri.bucket(), uri.key(), data.len() as u64);
212
213 Ok(Some(ObjectData { data, info }))
214 }
215
216 fn head(&self, uri: &FabricUri) -> Result<Option<ObjectInfo>> {
217 let key = Self::uri_key(uri);
218 match self.map.get(&key)? {
219 Some(entry) => Ok(Some(ObjectInfo::from_meta(&entry.meta))),
220 None => Ok(None),
221 }
222 }
223
224 fn delete(&mut self, uri: &FabricUri) -> Result<bool> {
225 let key = Self::uri_key(uri);
226 let entry = self.map.remove(&key)?;
227 if let Some(e) = &entry {
228 if e.data.is_empty() && e.meta.size > 0 {
230 let num_chunks = (e.meta.size as usize).div_ceil(CHUNK_SIZE);
231 for i in 0..num_chunks {
232 let ck = Self::chunk_key(uri, i);
233 let _ = self.map.remove(&ck);
234 }
235 }
236 }
237 Ok(entry.is_some())
238 }
239
240 fn list(&self, _pool: &str, _bucket: &str, prefix: &str) -> Result<Vec<String>> {
241 let match_prefix = if prefix.is_empty() {
242 String::new()
243 } else {
244 String::from(prefix)
245 };
246
247 let mut keys = Vec::new();
248 for item in self.map.iter() {
249 let (key_str, _): (String, StoreEntry) = item?;
250 if key_str.contains("#chunk/") || key_str.contains("#v/") {
252 continue;
253 }
254 if let Ok(uri) = key_str.parse::<FabricUri>() {
256 if uri.pool() == _pool
257 && uri.bucket() == _bucket
258 && (match_prefix.is_empty() || uri.key().starts_with(&match_prefix))
259 {
260 keys.push(String::from(uri.key()));
261 }
262 }
263 }
264 keys.sort();
265 Ok(keys)
266 }
267
268 #[cfg(feature = "versioning")]
269 fn get_version(&self, uri: &FabricUri, version: u64) -> Result<Option<ObjectData>> {
270 let vk = Self::version_key(uri, version);
271 let entry = match self.map.get(&vk)? {
272 Some(e) => e,
273 None => return Ok(None),
274 };
275
276 let info = ObjectInfo::from_meta(&entry.meta);
277 Ok(Some(ObjectData {
278 data: entry.data.clone(),
279 info,
280 }))
281 }
282
283 #[cfg(feature = "versioning")]
284 fn list_versions(&self, pool: &str, bucket: &str, key: &str) -> Result<Vec<VersionInfo>> {
285 let uri = FabricUri::new(pool, bucket, key)?;
286 let prefix = alloc::format!("{}#v/", uri);
287 let mut versions = Vec::new();
288 for item in self.map.iter() {
289 let (key_str, entry): (String, StoreEntry) = item?;
290 if key_str.starts_with(&prefix) {
291 versions.push(VersionInfo {
292 version: entry.meta.version,
293 size: entry.meta.size,
294 created_at: entry.meta.created_at,
295 });
296 }
297 }
298 versions.sort_by_key(|v| v.version);
299 Ok(versions)
300 }
301
302 #[cfg(feature = "versioning")]
303 fn delete_version(&mut self, uri: &FabricUri, version: u64) -> Result<bool> {
304 let vk = Self::version_key(uri, version);
305 let removed = self.map.remove(&vk)?;
306 Ok(removed.is_some())
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use grafos_std::host;
314
315 fn setup() {
316 host::reset_mock();
317 host::mock_set_fbmu_arena_size(1_048_576); }
319
320 #[test]
321 fn put_get_roundtrip() {
322 setup();
323 let mut store = MemObjectStore::new(64).unwrap();
324 let uri: FabricUri = "fabric://p/b/hello".parse().unwrap();
325
326 store.put(&uri, b"world", None).unwrap();
327 let obj = store.get(&uri).unwrap().unwrap();
328 assert_eq!(obj.data, b"world");
329 assert_eq!(obj.info.size, 5);
330 assert_eq!(obj.info.content_type, "application/octet-stream");
331 }
332
333 #[test]
334 fn head_returns_meta() {
335 setup();
336 let mut store = MemObjectStore::new(64).unwrap();
337 let uri: FabricUri = "fabric://p/b/k".parse().unwrap();
338
339 assert!(store.head(&uri).unwrap().is_none());
340
341 store.put(&uri, b"data", None).unwrap();
342 let info = store.head(&uri).unwrap().unwrap();
343 assert_eq!(info.size, 4);
344 assert_eq!(info.crc32, crc32(b"data"));
345 }
346
347 #[test]
348 fn delete_removes_object() {
349 setup();
350 let mut store = MemObjectStore::new(64).unwrap();
351 let uri: FabricUri = "fabric://p/b/del".parse().unwrap();
352
353 store.put(&uri, b"gone", None).unwrap();
354 assert!(store.delete(&uri).unwrap());
355 assert!(store.get(&uri).unwrap().is_none());
356 assert!(!store.delete(&uri).unwrap());
357 }
358
359 #[test]
360 fn list_prefix() {
361 setup();
362 let mut store = MemObjectStore::new(64).unwrap();
363
364 store
365 .put(&"fabric://p/b/logs/a".parse().unwrap(), b"1", None)
366 .unwrap();
367 store
368 .put(&"fabric://p/b/logs/b".parse().unwrap(), b"2", None)
369 .unwrap();
370 store
371 .put(&"fabric://p/b/data/c".parse().unwrap(), b"3", None)
372 .unwrap();
373
374 let logs = store.list("p", "b", "logs/").unwrap();
375 assert_eq!(logs, vec!["logs/a", "logs/b"]);
376
377 let all = store.list("p", "b", "").unwrap();
378 assert_eq!(all.len(), 3);
379 }
380
381 #[test]
382 fn overwrite_updates_data() {
383 setup();
384 let mut store = MemObjectStore::new(64).unwrap();
385 let uri: FabricUri = "fabric://p/b/ow".parse().unwrap();
386
387 store.put(&uri, b"first", None).unwrap();
388 store.put(&uri, b"second", None).unwrap();
389
390 let obj = store.get(&uri).unwrap().unwrap();
391 assert_eq!(obj.data, b"second");
392 }
393
394 #[test]
395 fn custom_content_type() {
396 setup();
397 let mut store = MemObjectStore::new(64).unwrap();
398 let uri: FabricUri = "fabric://p/b/ct".parse().unwrap();
399
400 let opts = PutOptions {
401 content_type: Some(String::from("text/plain")),
402 ..Default::default()
403 };
404 store.put(&uri, b"hello", Some(opts)).unwrap();
405
406 let info = store.head(&uri).unwrap().unwrap();
407 assert_eq!(info.content_type, "text/plain");
408 }
409
410 #[test]
411 fn crc32_integrity_verified() {
412 setup();
413 let mut store = MemObjectStore::new(64).unwrap();
414 let uri: FabricUri = "fabric://p/b/crc".parse().unwrap();
415
416 store.put(&uri, b"check me", None).unwrap();
417 let obj = store.get(&uri).unwrap().unwrap();
418 assert_eq!(obj.info.crc32, crc32(b"check me"));
419 }
420}