grafos_jobs/
retry.rs

1//! Retry policy configuration.
2
3use grafos_leasekit::Backoff;
4use grafos_std::error::FabricError;
5
6/// Classifies whether an error is retryable.
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum RetryableError {
9    /// The error is retryable (transient failure).
10    Transient,
11    /// The error is permanent — do not retry.
12    Permanent,
13}
14
15/// Retry policy for job chunk execution.
16///
17/// By default, `Disconnected` and `LeaseExpired` are retried with capped
18/// exponential backoff. All other errors (`Fenced`, `CapacityExceeded`,
19/// `IoError`, `Unsupported`) are treated as permanent failures — fail closed.
20#[derive(Debug, Clone)]
21pub struct RetryPolicy {
22    /// Maximum number of retry attempts per chunk (default 3).
23    pub max_retries: u32,
24    /// Initial backoff delay in seconds (default 1).
25    pub initial_backoff_secs: u64,
26    /// Maximum backoff delay in seconds (default 16).
27    pub max_backoff_secs: u64,
28}
29
30impl Default for RetryPolicy {
31    fn default() -> Self {
32        RetryPolicy {
33            max_retries: 3,
34            initial_backoff_secs: 1,
35            max_backoff_secs: 16,
36        }
37    }
38}
39
40impl RetryPolicy {
41    /// Classify an error as transient (retryable) or permanent.
42    ///
43    /// Only `Disconnected` and `LeaseExpired` are transient. Everything
44    /// else is permanent — fail closed on decode/corruption/fencing.
45    pub fn classify(&self, err: &FabricError) -> RetryableError {
46        match err {
47            FabricError::Disconnected | FabricError::LeaseExpired => RetryableError::Transient,
48            _ => RetryableError::Permanent,
49        }
50    }
51
52    /// Create a [`Backoff`] instance from this policy.
53    pub fn backoff(&self) -> Backoff {
54        Backoff::new(self.initial_backoff_secs, self.max_backoff_secs)
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61
62    #[test]
63    fn default_policy_values() {
64        let p = RetryPolicy::default();
65        assert_eq!(p.max_retries, 3);
66        assert_eq!(p.initial_backoff_secs, 1);
67        assert_eq!(p.max_backoff_secs, 16);
68    }
69
70    #[test]
71    fn classify_transient_errors() {
72        let p = RetryPolicy::default();
73        assert_eq!(
74            p.classify(&FabricError::Disconnected),
75            RetryableError::Transient
76        );
77        assert_eq!(
78            p.classify(&FabricError::LeaseExpired),
79            RetryableError::Transient
80        );
81    }
82
83    #[test]
84    fn classify_permanent_errors() {
85        let p = RetryPolicy::default();
86        assert_eq!(p.classify(&FabricError::Fenced), RetryableError::Permanent);
87        assert_eq!(
88            p.classify(&FabricError::CapacityExceeded),
89            RetryableError::Permanent
90        );
91        assert_eq!(
92            p.classify(&FabricError::IoError(-1)),
93            RetryableError::Permanent
94        );
95        assert_eq!(
96            p.classify(&FabricError::Unsupported),
97            RetryableError::Permanent
98        );
99    }
100
101    #[test]
102    fn backoff_from_policy() {
103        let p = RetryPolicy {
104            max_retries: 5,
105            initial_backoff_secs: 2,
106            max_backoff_secs: 32,
107        };
108        let mut b = p.backoff();
109        assert_eq!(b.next_delay(), 2);
110        assert_eq!(b.next_delay(), 4);
111        assert_eq!(b.next_delay(), 8);
112    }
113}