grafos_registry/
region.rs1extern crate alloc;
4use alloc::string::String;
5use alloc::vec::Vec;
6
7use grafos_collections::map::FabricHashMap;
8use grafos_std::error::Result;
9use grafos_std::host;
10use serde::{Deserialize, Serialize};
11
12use crate::filter::RegistryFilter;
13use crate::registration::{HealthStatus, ServiceRegistration};
14
15#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
17pub struct RegistrationSlot {
18 pub registration: ServiceRegistration,
19}
20
21pub struct RegistryRegion {
28 map: FabricHashMap<String, Vec<RegistrationSlot>>,
29 version: u64,
30 default_ttl_secs: u32,
31}
32
33impl RegistryRegion {
34 pub fn new(
36 capacity: usize,
37 default_ttl_secs: u32,
38 name_stride: usize,
39 slot_stride: usize,
40 ) -> Result<Self> {
41 let map = FabricHashMap::with_capacity(capacity, name_stride, slot_stride)?;
42 Ok(RegistryRegion {
43 map,
44 version: 0,
45 default_ttl_secs,
46 })
47 }
48
49 pub fn version(&self) -> u64 {
51 self.version
52 }
53
54 pub fn register(&mut self, mut reg: ServiceRegistration) -> Result<()> {
57 let now = host::unix_time_secs();
58 reg.lease_expires_at = now + self.default_ttl_secs as u64;
59
60 let name = reg.name.clone();
61 let mut slots = self.map.get(&name)?.unwrap_or_default();
62
63 slots.retain(|s| s.registration.instance_id != reg.instance_id);
65 slots.push(RegistrationSlot { registration: reg });
66
67 self.map.insert(&name, &slots)?;
68 self.version += 1;
69
70 #[cfg(feature = "observe")]
71 {
72 let ver = &slots.last().unwrap().registration.version;
73 crate::observe_hooks::on_service_registered(&name, ver);
74 }
75
76 Ok(())
77 }
78
79 pub fn deregister(&mut self, name: &str, instance_id: u128) -> Result<bool> {
81 let key = String::from(name);
82 let mut slots = self.map.get(&key)?.unwrap_or_default();
83 let before = slots.len();
84 slots.retain(|s| s.registration.instance_id != instance_id);
85 let removed = slots.len() < before;
86
87 if slots.is_empty() {
88 self.map.remove(&key)?;
89 } else {
90 self.map.insert(&key, &slots)?;
91 }
92
93 if removed {
94 self.version += 1;
95 #[cfg(feature = "observe")]
96 crate::observe_hooks::on_service_deregistered(name);
97 }
98 Ok(removed)
99 }
100
101 pub fn set_health(
103 &mut self,
104 name: &str,
105 instance_id: u128,
106 health: HealthStatus,
107 ) -> Result<bool> {
108 let key = String::from(name);
109 let mut slots = match self.map.get(&key)? {
110 Some(s) => s,
111 None => return Ok(false),
112 };
113
114 let mut found = false;
115 for slot in &mut slots {
116 if slot.registration.instance_id == instance_id {
117 slot.registration.health = health.clone();
118 found = true;
119 break;
120 }
121 }
122
123 if found {
124 self.map.insert(&key, &slots)?;
125 self.version += 1;
126 #[cfg(feature = "observe")]
127 {
128 let old_str = "unknown";
129 let new_str = match &health {
130 HealthStatus::Healthy => "healthy",
131 HealthStatus::Degraded { .. } => "degraded",
132 HealthStatus::Draining => "draining",
133 };
134 crate::observe_hooks::on_health_change(name, old_str, new_str);
135 }
136 }
137 Ok(found)
138 }
139
140 pub fn set_draining(&mut self, name: &str, instance_id: u128) -> Result<bool> {
142 self.set_health(name, instance_id, HealthStatus::Draining)
143 }
144
145 pub fn lookup(&self, name: &str) -> Result<Vec<ServiceRegistration>> {
147 let key = String::from(name);
148 match self.map.get(&key)? {
149 Some(slots) => Ok(slots.into_iter().map(|s| s.registration).collect()),
150 None => Ok(Vec::new()),
151 }
152 }
153
154 pub fn lookup_one(&self, name: &str) -> Result<Option<ServiceRegistration>> {
156 let results = self.lookup(name)?;
157 Ok(results.into_iter().next())
158 }
159
160 pub fn lookup_filtered(
162 &self,
163 name: &str,
164 filter: &RegistryFilter,
165 ) -> Result<Vec<ServiceRegistration>> {
166 let all = self.lookup(name)?;
167 Ok(all.into_iter().filter(|r| filter.matches(r)).collect())
168 }
169
170 pub fn list_services(&self) -> Result<Vec<String>> {
172 let mut names = Vec::new();
173 for item in self.map.iter() {
174 let (name, slots) = item?;
175 if !slots.is_empty() {
176 names.push(name);
177 }
178 }
179 Ok(names)
180 }
181
182 pub fn tick(&mut self) -> Result<usize> {
184 let now = host::unix_time_secs();
185 let mut total_pruned = 0;
186
187 let mut keys = Vec::new();
189 for item in self.map.iter() {
190 let (name, _) = item?;
191 keys.push(name);
192 }
193
194 for key in keys {
195 let mut slots = match self.map.get(&key)? {
196 Some(s) => s,
197 None => continue,
198 };
199 let before = slots.len();
200 slots.retain(|s| s.registration.lease_expires_at > now);
201 let pruned = before - slots.len();
202
203 if pruned > 0 {
204 if slots.is_empty() {
205 self.map.remove(&key)?;
206 } else {
207 self.map.insert(&key, &slots)?;
208 }
209 total_pruned += pruned;
210 }
211 }
212
213 if total_pruned > 0 {
214 self.version += 1;
215 }
216 Ok(total_pruned)
217 }
218}