grafos_registry/
lib.rs

1//! Fabric-wide service registry for grafOS.
2//!
3//! [`RegistryRegion`] stores service registrations in a `FabricHashMap` backed
4//! by leased fabric memory. Services register via [`RegistryWriter`] and are
5//! discovered via [`RegistryReader`]. Change detection uses a monotonic version
6//! counter exposed through [`RegistryWatcher`].
7//!
8//! # Quick start
9//!
10//! ```rust
11//! use grafos_registry::{RegistryBuilder, ServiceRegistration, ServiceEndpoint, HealthStatus};
12//!
13//! # grafos_std::host::reset_mock();
14//! # grafos_std::host::mock_set_fbmu_arena_size(1048576);
15//! let (mut writer, _reader) = RegistryBuilder::new()
16//!     .capacity(64)
17//!     .build()?;
18//!
19//! let reg = ServiceRegistration::new("my-service", "1.0.0", 1)
20//!     .with_endpoint(ServiceEndpoint::net([10, 0, 0, 1], 8080))
21//!     .with_tag("region", "us-east");
22//!
23//! writer.register(reg)?;
24//!
25//! let results = writer.lookup("my-service")?;
26//! assert_eq!(results.len(), 1);
27//! # Ok::<(), grafos_std::FabricError>(())
28//! ```
29//!
30//! # Feature flags
31//!
32//! | Feature | Default | Effect |
33//! |---------|---------|--------|
34//! | `std` | Yes | Enables `std` in grafos-std |
35//! | `observe` | No | Enables grafos-observe metrics |
36//! | `rpc` | No | Adds grafos-rpc RpcResolver integration |
37//! | `durable` | No | Cold-start checkpoint to block storage |
38
39#![cfg_attr(not(feature = "std"), no_std)]
40
41extern crate alloc;
42
43mod endpoint;
44mod filter;
45mod reader;
46mod region;
47mod registration;
48mod watcher;
49mod writer;
50
51#[cfg(feature = "observe")]
52pub mod observe_hooks;
53
54#[cfg(feature = "rpc")]
55pub mod rpc_resolver;
56
57pub use endpoint::ServiceEndpoint;
58pub use filter::RegistryFilter;
59pub use reader::RegistryReader;
60pub use region::RegistryRegion;
61pub use registration::{HealthStatus, ServiceRegistration};
62pub use watcher::RegistryWatcher;
63pub use writer::RegistryWriter;
64
65use grafos_std::error::Result;
66
67/// Maximum serialized size for service name keys in the registry map.
68const NAME_STRIDE: usize = 128;
69/// Maximum serialized size for a registration slot list value.
70const SLOT_STRIDE: usize = 4096;
71
72/// Builder for creating a registry writer/reader pair.
73///
74/// # Example
75///
76/// ```rust
77/// use grafos_registry::RegistryBuilder;
78///
79/// # grafos_std::host::reset_mock();
80/// # grafos_std::host::mock_set_fbmu_arena_size(1048576);
81/// let (writer, reader) = RegistryBuilder::new()
82///     .capacity(32)
83///     .default_ttl_secs(300)
84///     .build()?;
85/// # Ok::<(), grafos_std::FabricError>(())
86/// ```
87pub struct RegistryBuilder {
88    capacity: usize,
89    default_ttl_secs: u32,
90    name_stride: usize,
91    slot_stride: usize,
92}
93
94impl RegistryBuilder {
95    /// Create a new builder with default settings.
96    pub fn new() -> Self {
97        RegistryBuilder {
98            capacity: 64,
99            default_ttl_secs: 300,
100            name_stride: NAME_STRIDE,
101            slot_stride: SLOT_STRIDE,
102        }
103    }
104
105    /// Set the number of hash map buckets for the registry.
106    pub fn capacity(mut self, n: usize) -> Self {
107        self.capacity = n;
108        self
109    }
110
111    /// Set the default TTL in seconds for registrations.
112    pub fn default_ttl_secs(mut self, secs: u32) -> Self {
113        self.default_ttl_secs = secs;
114        self
115    }
116
117    /// Set the name stride (max serialized service name size).
118    pub fn name_stride(mut self, stride: usize) -> Self {
119        self.name_stride = stride;
120        self
121    }
122
123    /// Set the slot stride (max serialized slot list size).
124    pub fn slot_stride(mut self, stride: usize) -> Self {
125        self.slot_stride = stride;
126        self
127    }
128
129    /// Build a ([`RegistryWriter`], [`RegistryReader`]) pair backed by a new
130    /// [`RegistryRegion`].
131    pub fn build(self) -> Result<(RegistryWriter, RegistryReader)> {
132        let region = RegistryRegion::new(
133            self.capacity,
134            self.default_ttl_secs,
135            self.name_stride,
136            self.slot_stride,
137        )?;
138        let writer = RegistryWriter::new(region);
139        // The reader shares the same underlying region via a second reference.
140        // In the fabric model, both writer and reader operate on the same
141        // leased memory region. Here we create a second RegistryRegion that
142        // points to the same mock arena.
143        let reader_region = RegistryRegion::new(
144            self.capacity,
145            self.default_ttl_secs,
146            self.name_stride,
147            self.slot_stride,
148        )?;
149        let reader = RegistryReader::new(reader_region);
150        Ok((writer, reader))
151    }
152}
153
154impl Default for RegistryBuilder {
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use alloc::string::String;
164    use alloc::vec;
165    use grafos_std::host;
166
167    fn setup() {
168        host::reset_mock();
169        host::mock_set_fbmu_arena_size(1048576);
170    }
171
172    #[test]
173    fn register_and_discover_roundtrip() {
174        setup();
175        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
176
177        let reg = ServiceRegistration::new("api-gateway", "2.1.0", 1001)
178            .with_endpoint(ServiceEndpoint::net([10, 0, 0, 1], 8080))
179            .with_tag("env", "prod");
180
181        region.register(reg.clone()).unwrap();
182
183        let results = region.lookup("api-gateway").unwrap();
184        assert_eq!(results.len(), 1);
185        assert_eq!(results[0].name, "api-gateway");
186        assert_eq!(results[0].version, "2.1.0");
187        assert_eq!(results[0].instance_id, 1001);
188    }
189
190    #[test]
191    fn multi_instance_registration() {
192        setup();
193        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
194
195        for i in 0..3 {
196            let reg = ServiceRegistration::new("worker", "1.0.0", 100 + i)
197                .with_endpoint(ServiceEndpoint::net([10, 0, 0, i as u8 + 1], 9090));
198            region.register(reg).unwrap();
199        }
200
201        let results = region.lookup("worker").unwrap();
202        assert_eq!(results.len(), 3);
203    }
204
205    #[test]
206    fn explicit_deregister() {
207        setup();
208        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
209
210        let reg = ServiceRegistration::new("temp-svc", "1.0.0", 42);
211        region.register(reg).unwrap();
212
213        assert_eq!(region.lookup("temp-svc").unwrap().len(), 1);
214
215        region.deregister("temp-svc", 42).unwrap();
216        assert_eq!(region.lookup("temp-svc").unwrap().len(), 0);
217    }
218
219    #[test]
220    fn auto_deregister_on_lease_expiry() {
221        setup();
222        let mut region = RegistryRegion::new(64, 10, NAME_STRIDE, SLOT_STRIDE).unwrap();
223
224        let reg = ServiceRegistration::new("ephemeral", "1.0.0", 1);
225        region.register(reg).unwrap();
226
227        assert_eq!(region.lookup("ephemeral").unwrap().len(), 1);
228
229        host::mock_advance_time_secs(11);
230
231        let pruned = region.tick().unwrap();
232        assert_eq!(pruned, 1);
233        assert_eq!(region.lookup("ephemeral").unwrap().len(), 0);
234    }
235
236    #[test]
237    fn health_status_changes() {
238        setup();
239        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
240
241        let reg = ServiceRegistration::new("db-proxy", "3.0.0", 7);
242        region.register(reg).unwrap();
243
244        region
245            .set_health(
246                "db-proxy",
247                7,
248                HealthStatus::Degraded {
249                    reason: String::from("high latency"),
250                },
251            )
252            .unwrap();
253
254        let results = region.lookup("db-proxy").unwrap();
255        assert!(matches!(results[0].health, HealthStatus::Degraded { .. }));
256
257        region
258            .set_health("db-proxy", 7, HealthStatus::Draining)
259            .unwrap();
260
261        let results = region.lookup("db-proxy").unwrap();
262        assert!(matches!(results[0].health, HealthStatus::Draining));
263    }
264
265    #[test]
266    fn tag_and_version_filtering() {
267        setup();
268        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
269
270        let reg1 = ServiceRegistration::new("api", "1.0.0", 1)
271            .with_tag("env", "prod")
272            .with_tag("region", "us-east");
273        let reg2 = ServiceRegistration::new("api", "1.1.0", 2)
274            .with_tag("env", "prod")
275            .with_tag("region", "us-west");
276        let reg3 = ServiceRegistration::new("api", "2.0.0", 3).with_tag("env", "staging");
277
278        region.register(reg1).unwrap();
279        region.register(reg2).unwrap();
280        region.register(reg3).unwrap();
281
282        // Filter by version prefix
283        let filter = RegistryFilter::new().version_prefix("1.");
284        let results = region.lookup_filtered("api", &filter).unwrap();
285        assert_eq!(results.len(), 2);
286
287        // Filter by tag
288        let filter = RegistryFilter::new().tag("env", "prod");
289        let results = region.lookup_filtered("api", &filter).unwrap();
290        assert_eq!(results.len(), 2);
291
292        // Filter by multiple tags (AND)
293        let filter = RegistryFilter::new()
294            .tag("env", "prod")
295            .tag("region", "us-east");
296        let results = region.lookup_filtered("api", &filter).unwrap();
297        assert_eq!(results.len(), 1);
298        assert_eq!(results[0].instance_id, 1);
299
300        // Filter by health
301        let filter = RegistryFilter::new().health(HealthStatus::Healthy);
302        let results = region.lookup_filtered("api", &filter).unwrap();
303        assert_eq!(results.len(), 3);
304    }
305
306    #[test]
307    fn watch_notification() {
308        setup();
309        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
310
311        let initial_version = region.version();
312
313        let reg = ServiceRegistration::new("watched", "1.0.0", 1);
314        region.register(reg).unwrap();
315
316        assert!(region.version() > initial_version);
317
318        let mut watcher = RegistryWatcher::new(initial_version);
319        assert!(watcher.changed(region.version()));
320
321        // After acknowledging, no change
322        watcher.acknowledge(region.version());
323        assert!(!watcher.changed(region.version()));
324
325        // Another registration bumps version
326        let reg2 = ServiceRegistration::new("watched-2", "1.0.0", 2);
327        region.register(reg2).unwrap();
328        assert!(watcher.changed(region.version()));
329    }
330
331    #[test]
332    fn list_services() {
333        setup();
334        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
335
336        region
337            .register(ServiceRegistration::new("svc-a", "1.0.0", 1))
338            .unwrap();
339        region
340            .register(ServiceRegistration::new("svc-b", "1.0.0", 2))
341            .unwrap();
342        region
343            .register(ServiceRegistration::new("svc-a", "1.0.0", 3))
344            .unwrap();
345
346        let mut names = region.list_services().unwrap();
347        names.sort();
348        assert_eq!(names, vec!["svc-a", "svc-b"]);
349    }
350
351    #[test]
352    fn lookup_one() {
353        setup();
354        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
355
356        assert!(region.lookup_one("missing").unwrap().is_none());
357
358        region
359            .register(ServiceRegistration::new("singleton", "1.0.0", 42))
360            .unwrap();
361
362        let result = region.lookup_one("singleton").unwrap();
363        assert!(result.is_some());
364        assert_eq!(result.unwrap().instance_id, 42);
365    }
366
367    #[test]
368    fn update_registration() {
369        setup();
370        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
371
372        let reg = ServiceRegistration::new("updatable", "1.0.0", 1)
373            .with_endpoint(ServiceEndpoint::net([10, 0, 0, 1], 8080));
374        region.register(reg).unwrap();
375
376        // Re-register same instance_id replaces the old one
377        let updated = ServiceRegistration::new("updatable", "1.1.0", 1)
378            .with_endpoint(ServiceEndpoint::net([10, 0, 0, 2], 9090));
379        region.register(updated).unwrap();
380
381        let results = region.lookup("updatable").unwrap();
382        assert_eq!(results.len(), 1);
383        assert_eq!(results[0].version, "1.1.0");
384    }
385
386    #[test]
387    fn writer_reader_pair() {
388        setup();
389        let (mut writer, _reader) = RegistryBuilder::new().capacity(32).build().unwrap();
390
391        let reg = ServiceRegistration::new("pair-test", "1.0.0", 1);
392        writer.register(reg).unwrap();
393
394        // Writer's internal region has the registration
395        let from_writer = writer.lookup("pair-test").unwrap();
396        assert_eq!(from_writer.len(), 1);
397    }
398
399    #[test]
400    fn set_draining() {
401        setup();
402        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
403
404        region
405            .register(ServiceRegistration::new("drain-me", "1.0.0", 5))
406            .unwrap();
407
408        region.set_draining("drain-me", 5).unwrap();
409
410        let results = region.lookup("drain-me").unwrap();
411        assert!(matches!(results[0].health, HealthStatus::Draining));
412    }
413
414    #[test]
415    fn endpoint_variants() {
416        setup();
417        let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
418
419        let reg = ServiceRegistration::new("multi-endpoint", "1.0.0", 1)
420            .with_endpoint(ServiceEndpoint::net([10, 0, 0, 1], 8080))
421            .with_endpoint(ServiceEndpoint::queue(b"topic://events"))
422            .with_endpoint(ServiceEndpoint::store(b"bucket://data"))
423            .with_endpoint(ServiceEndpoint::custom("grpc", b"grpc://10.0.0.1:50051"));
424
425        region.register(reg).unwrap();
426
427        let results = region.lookup("multi-endpoint").unwrap();
428        assert_eq!(results[0].endpoints.len(), 4);
429    }
430}