1extern crate alloc;
4use alloc::boxed::Box;
5use alloc::vec::Vec;
6
7use crate::backoff::Backoff;
8use crate::policy::RenewalPolicy;
9
10struct Entry {
12 lease_id: u128,
13 created_at: u64,
14 expires_at: u64,
15 policy: RenewalPolicy,
16 backoff: Backoff,
17 next_retry_at: u64,
19 last_renewed_at: Option<u64>,
22}
23
24impl Entry {
25 fn renewal_deadline(&self) -> u64 {
26 let created = self.last_renewed_at.unwrap_or(self.created_at);
27 let jitter_seed = self.lease_id as u64;
28 self.policy
29 .renewal_deadline(created, self.expires_at, jitter_seed)
30 }
31
32 fn is_near_expiry(&self, now: u64) -> bool {
33 if now >= self.expires_at {
34 return true;
35 }
36 let remaining = self.expires_at - now;
37 let total_ttl = self
38 .expires_at
39 .saturating_sub(self.last_renewed_at.unwrap_or(self.created_at));
40 if total_ttl == 0 {
41 return true;
42 }
43 remaining < total_ttl / 10
45 }
46}
47
48#[derive(Debug, Clone, Default)]
50pub struct RenewalSummary {
51 pub renewed: u32,
53 pub skipped: u32,
55 pub failed: u32,
57 pub near_expiry: Vec<u128>,
59}
60
61pub struct RenewalManager {
70 entries: Vec<Entry>,
71 revoke_callbacks: Vec<Box<dyn Fn(u128, u8)>>,
72}
73
74impl RenewalManager {
75 pub fn new() -> Self {
77 RenewalManager {
78 entries: Vec::new(),
79 revoke_callbacks: Vec::new(),
80 }
81 }
82
83 pub fn register(&mut self, lease_id: u128, expires_at: u64, policy: RenewalPolicy) {
88 let created_at = expires_at.saturating_sub(policy.min_renew_secs);
89 self.register_with_created_at(lease_id, created_at, expires_at, policy);
90 }
91
92 pub fn register_with_created_at(
94 &mut self,
95 lease_id: u128,
96 created_at: u64,
97 expires_at: u64,
98 policy: RenewalPolicy,
99 ) {
100 self.entries.retain(|e| e.lease_id != lease_id);
102 let backoff = Backoff::new(1, policy.max_backoff_secs);
103 self.entries.push(Entry {
104 lease_id,
105 created_at,
106 expires_at,
107 policy,
108 backoff,
109 next_retry_at: 0,
110 last_renewed_at: None,
111 });
112 }
113
114 pub fn unregister(&mut self, lease_id: u128) {
116 self.entries.retain(|e| e.lease_id != lease_id);
117 }
118
119 pub fn len(&self) -> usize {
121 self.entries.len()
122 }
123
124 pub fn is_empty(&self) -> bool {
126 self.entries.is_empty()
127 }
128
129 pub fn tick(&mut self, now_unix_secs: u64) -> RenewalSummary {
133 self.tick_with(now_unix_secs, |_lease_id, duration| {
134 Ok(duration)
136 })
137 }
138
139 pub fn tick_with<F>(&mut self, now_unix_secs: u64, mut renew_fn: F) -> RenewalSummary
144 where
145 F: FnMut(u128, u64) -> Result<u64, ()>,
146 {
147 let mut summary = RenewalSummary::default();
148
149 for entry in self.entries.iter_mut() {
150 if entry.is_near_expiry(now_unix_secs) {
152 summary.near_expiry.push(entry.lease_id);
153 }
154
155 if now_unix_secs >= entry.expires_at {
157 summary.skipped += 1;
158 continue;
159 }
160
161 let deadline = entry.renewal_deadline();
163 if now_unix_secs < deadline {
164 summary.skipped += 1;
165 continue;
166 }
167
168 if now_unix_secs < entry.next_retry_at {
170 summary.skipped += 1;
171 continue;
172 }
173
174 let duration = entry.policy.min_renew_secs;
176 match renew_fn(entry.lease_id, duration) {
177 Ok(new_expires_at) => {
178 let new_exp = now_unix_secs.saturating_add(new_expires_at);
179 entry.last_renewed_at = Some(now_unix_secs);
180 entry.expires_at = new_exp;
181 entry.backoff.reset();
182 entry.next_retry_at = 0;
183 summary.renewed += 1;
184
185 #[cfg(feature = "observe")]
186 emit_renew_success(entry.lease_id);
187 }
188 Err(()) => {
189 let delay = entry.backoff.next_delay();
190 entry.next_retry_at = now_unix_secs.saturating_add(delay);
191 summary.failed += 1;
192
193 #[cfg(feature = "observe")]
194 emit_renew_failure(entry.lease_id);
195 }
196 }
197 }
198
199 #[cfg(feature = "observe")]
200 emit_tick_metrics(&summary, self.entries.len());
201
202 summary
203 }
204
205 pub fn is_near_expiry(&self, lease_id: u128, now: u64) -> bool {
207 self.entries
208 .iter()
209 .find(|e| e.lease_id == lease_id)
210 .map(|e| e.is_near_expiry(now))
211 .unwrap_or(false)
212 }
213
214 pub fn on_revoked<F: Fn(u128, u8) + 'static>(&mut self, callback: F) {
219 self.revoke_callbacks.push(Box::new(callback));
220 }
221
222 pub fn notify_revoked(&self, lease_id: u128, reason: u8) {
226 for cb in &self.revoke_callbacks {
227 cb(lease_id, reason);
228 }
229 }
230
231 pub fn expires_at(&self, lease_id: u128) -> Option<u64> {
233 self.entries
234 .iter()
235 .find(|e| e.lease_id == lease_id)
236 .map(|e| e.expires_at)
237 }
238}
239
240impl Default for RenewalManager {
241 fn default() -> Self {
242 Self::new()
243 }
244}
245
246#[cfg(feature = "observe")]
247fn emit_renew_success(lease_id: u128) {
248 let _ = lease_id;
249 grafos_observe::FabricMetrics::global().ops_total.inc();
250}
251
252#[cfg(feature = "observe")]
253fn emit_renew_failure(lease_id: u128) {
254 let _ = lease_id;
255 grafos_observe::FabricMetrics::global().ops_total.inc();
256}
257
258#[cfg(feature = "observe")]
259fn emit_tick_metrics(summary: &RenewalSummary, managed_count: usize) {
260 let _ = (summary, managed_count);
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn register_and_tick_before_deadline() {
269 let mut mgr = RenewalManager::new();
270 let policy = RenewalPolicy {
271 renew_at_fraction: 0.75,
272 jitter_fraction: 0.0,
273 min_renew_secs: 100,
274 max_backoff_secs: 5,
275 };
276 mgr.register_with_created_at(1, 1000, 1100, policy);
278
279 let s = mgr.tick(1050);
281 assert_eq!(s.renewed, 0);
282 assert_eq!(s.skipped, 1);
283 assert_eq!(s.failed, 0);
284 }
285
286 #[test]
287 fn tick_at_deadline_triggers_renewal() {
288 let mut mgr = RenewalManager::new();
289 let policy = RenewalPolicy {
290 renew_at_fraction: 0.75,
291 jitter_fraction: 0.0,
292 min_renew_secs: 100,
293 max_backoff_secs: 5,
294 };
295 mgr.register_with_created_at(1, 1000, 1100, policy);
296
297 let s = mgr.tick(1075);
299 assert_eq!(s.renewed, 1);
300 assert_eq!(s.skipped, 0);
301 }
302
303 #[test]
304 fn tick_after_expiry_skips() {
305 let mut mgr = RenewalManager::new();
306 let policy = RenewalPolicy {
307 renew_at_fraction: 0.75,
308 jitter_fraction: 0.0,
309 min_renew_secs: 100,
310 max_backoff_secs: 5,
311 };
312 mgr.register_with_created_at(1, 1000, 1100, policy);
313
314 let s = mgr.tick(1200);
315 assert_eq!(s.renewed, 0);
316 assert_eq!(s.skipped, 1);
317 }
318
319 #[test]
320 fn tick_with_failure_triggers_backoff() {
321 let mut mgr = RenewalManager::new();
322 let policy = RenewalPolicy {
323 renew_at_fraction: 0.75,
324 jitter_fraction: 0.0,
325 min_renew_secs: 100,
326 max_backoff_secs: 5,
327 };
328 mgr.register_with_created_at(1, 1000, 1100, policy);
329
330 let s = mgr.tick_with(1075, |_, _| Err(()));
332 assert_eq!(s.failed, 1);
333 assert_eq!(s.renewed, 0);
334
335 let s = mgr.tick_with(1075, |_, _| Ok(100));
337 assert_eq!(s.skipped, 1);
338 assert_eq!(s.renewed, 0);
339
340 let s = mgr.tick_with(1076, |_, _| Ok(100));
342 assert_eq!(s.renewed, 1);
343 }
344
345 #[test]
346 fn multiple_leases() {
347 let mut mgr = RenewalManager::new();
348 let policy = RenewalPolicy {
349 renew_at_fraction: 0.50,
350 jitter_fraction: 0.0,
351 min_renew_secs: 100,
352 max_backoff_secs: 5,
353 };
354
355 mgr.register_with_created_at(1, 1000, 1100, policy);
356 mgr.register_with_created_at(2, 1000, 1200, policy);
357 mgr.register_with_created_at(3, 1000, 1300, policy);
358
359 let s = mgr.tick(1050);
361 assert_eq!(s.renewed, 1);
362 assert_eq!(s.skipped, 2);
363
364 let s = mgr.tick(1100);
367 assert!(s.renewed >= 1);
368 }
369
370 #[test]
371 fn unregister_removes_lease() {
372 let mut mgr = RenewalManager::new();
373 let policy = RenewalPolicy::default();
374 mgr.register(1, 1100, policy);
375 mgr.register(2, 1200, policy);
376 assert_eq!(mgr.len(), 2);
377
378 mgr.unregister(1);
379 assert_eq!(mgr.len(), 1);
380 assert!(mgr.expires_at(1).is_none());
381 assert!(mgr.expires_at(2).is_some());
382 }
383
384 #[test]
385 fn is_near_expiry_predicate() {
386 let mut mgr = RenewalManager::new();
387 let policy = RenewalPolicy {
388 renew_at_fraction: 0.75,
389 jitter_fraction: 0.0,
390 min_renew_secs: 100,
391 max_backoff_secs: 5,
392 };
393 mgr.register_with_created_at(1, 1000, 1100, policy);
394
395 assert!(!mgr.is_near_expiry(1, 1050));
397
398 assert!(mgr.is_near_expiry(1, 1091));
400
401 assert!(mgr.is_near_expiry(1, 1100));
403 }
404
405 #[test]
406 fn near_expiry_in_summary() {
407 let mut mgr = RenewalManager::new();
408 let policy = RenewalPolicy {
409 renew_at_fraction: 0.75,
410 jitter_fraction: 0.0,
411 min_renew_secs: 100,
412 max_backoff_secs: 5,
413 };
414 mgr.register_with_created_at(1, 1000, 1100, policy);
415
416 let s = mgr.tick(1091);
417 assert!(s.near_expiry.contains(&1));
418 }
419
420 #[test]
421 fn re_register_replaces_entry() {
422 let mut mgr = RenewalManager::new();
423 let policy = RenewalPolicy::default();
424 mgr.register(1, 1100, policy);
425 assert_eq!(mgr.expires_at(1), Some(1100));
426
427 mgr.register(1, 2200, policy);
428 assert_eq!(mgr.len(), 1);
429 assert_eq!(mgr.expires_at(1), Some(2200));
430 }
431
432 #[test]
433 fn unknown_lease_is_not_near_expiry() {
434 let mgr = RenewalManager::new();
435 assert!(!mgr.is_near_expiry(999, 5000));
436 }
437
438 #[test]
439 fn renewal_extends_expiry() {
440 let mut mgr = RenewalManager::new();
441 let policy = RenewalPolicy {
442 renew_at_fraction: 0.75,
443 jitter_fraction: 0.0,
444 min_renew_secs: 100,
445 max_backoff_secs: 5,
446 };
447 mgr.register_with_created_at(1, 1000, 1100, policy);
448
449 let old_exp = mgr.expires_at(1).unwrap();
450 assert_eq!(old_exp, 1100);
451
452 mgr.tick(1080);
454 let new_exp = mgr.expires_at(1).unwrap();
455 assert_eq!(new_exp, 1180);
457 }
458
459 #[test]
460 fn on_revoked_callback_fires() {
461 use alloc::sync::Arc;
462 use core::sync::atomic::{AtomicBool, Ordering};
463
464 let mut mgr = RenewalManager::new();
465 let fired = Arc::new(AtomicBool::new(false));
466 let fired_clone = Arc::clone(&fired);
467 mgr.on_revoked(move |_lease_id, _reason| {
468 fired_clone.store(true, Ordering::SeqCst);
469 });
470
471 mgr.notify_revoked(42, 1);
472 assert!(fired.load(Ordering::SeqCst));
473 }
474
475 #[test]
476 fn multiple_revocation_callbacks_fire() {
477 use alloc::sync::Arc;
478 use core::sync::atomic::{AtomicBool, Ordering};
479
480 let mut mgr = RenewalManager::new();
481
482 let fired_a = Arc::new(AtomicBool::new(false));
483 let fired_b = Arc::new(AtomicBool::new(false));
484
485 let a = Arc::clone(&fired_a);
486 mgr.on_revoked(move |_, _| {
487 a.store(true, Ordering::SeqCst);
488 });
489
490 let b = Arc::clone(&fired_b);
491 mgr.on_revoked(move |_, _| {
492 b.store(true, Ordering::SeqCst);
493 });
494
495 mgr.notify_revoked(99, 2);
496 assert!(fired_a.load(Ordering::SeqCst));
497 assert!(fired_b.load(Ordering::SeqCst));
498 }
499
500 #[test]
501 fn notify_revoked_with_no_callbacks_is_noop() {
502 let mgr = RenewalManager::new();
503 mgr.notify_revoked(7, 0);
505 }
506
507 #[test]
508 fn revocation_callback_receives_correct_args() {
509 use alloc::sync::Arc;
510 use std::sync::Mutex;
511
512 let mut mgr = RenewalManager::new();
513 let log: Arc<Mutex<Vec<(u128, u8)>>> = Arc::new(Mutex::new(Vec::new()));
514 let log_clone = Arc::clone(&log);
515 mgr.on_revoked(move |lease_id, reason| {
516 log_clone.lock().unwrap().push((lease_id, reason));
517 });
518
519 mgr.notify_revoked(42, 1);
520 mgr.notify_revoked(1000, 255);
521
522 let entries = log.lock().unwrap();
523 assert_eq!(entries.len(), 2);
524 assert_eq!(entries[0], (42, 1));
525 assert_eq!(entries[1], (1000, 255));
526 }
527
528 #[test]
529 fn backoff_escalation_on_repeated_failure() {
530 let mut mgr = RenewalManager::new();
531 let policy = RenewalPolicy {
532 renew_at_fraction: 0.75,
533 jitter_fraction: 0.0,
534 min_renew_secs: 100,
535 max_backoff_secs: 5,
536 };
537 mgr.register_with_created_at(1, 1000, 1100, policy);
538
539 let s = mgr.tick_with(1075, |_, _| Err(()));
541 assert_eq!(s.failed, 1);
542
543 let s = mgr.tick_with(1076, |_, _| Err(()));
545 assert_eq!(s.failed, 1);
546
547 let s = mgr.tick_with(1077, |_, _| Ok(100));
549 assert_eq!(s.skipped, 1);
550
551 let s = mgr.tick_with(1078, |_, _| Ok(100));
553 assert_eq!(s.renewed, 1);
554 }
555}