grafos_rpc/
transport.rs

1//! Transport abstraction for grafos-rpc.
2//!
3//! Two implementations:
4//! - [`SharedMemoryTransport`]: Hot path using leased shared memory (FBMU).
5//! - [`QuicTransport`]: Fallback for non-co-located services using QUIC bidi streams.
6//!
7//! The [`AutoTransport`] selects shared memory when co-located and QUIC when
8//! remote, presenting the same [`RpcTransport`] interface to generated clients.
9
10extern crate alloc;
11
12use alloc::boxed::Box;
13use alloc::vec::Vec;
14use core::sync::atomic::{AtomicU64, Ordering};
15
16use grafos_std::error::Result;
17use grafos_std::mem::MemLease;
18
19use crate::mux::{RpcMuxClient, DEFAULT_NUM_SLOTS, DEFAULT_SLOT_PAYLOAD_SIZE};
20
21/// Transport trait for making RPC calls.
22///
23/// Generated client stubs use this trait to send requests and receive
24/// responses, independent of whether the backing transport is shared memory
25/// or QUIC.
26pub trait RpcTransport {
27    /// Send a request with the given `method_id` and serialized `payload`,
28    /// returning the serialized response bytes.
29    fn call(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>>;
30}
31
32// ---------------------------------------------------------------------------
33// SharedMemoryTransport
34// ---------------------------------------------------------------------------
35
36/// Transport backed by a leased shared memory region (FBMU).
37///
38/// Uses [`RpcMuxClient`] internally for multi-slot concurrency.
39pub struct SharedMemoryTransport<'a> {
40    mux: RpcMuxClient<'a>,
41}
42
43impl<'a> SharedMemoryTransport<'a> {
44    /// Create a shared memory transport using the given lease with default
45    /// slot count and payload size.
46    pub fn new(lease: &'a MemLease) -> Self {
47        SharedMemoryTransport {
48            mux: RpcMuxClient::new(lease, DEFAULT_NUM_SLOTS, DEFAULT_SLOT_PAYLOAD_SIZE),
49        }
50    }
51
52    /// Create with custom slot parameters.
53    pub fn with_slots(lease: &'a MemLease, num_slots: usize, slot_payload_size: usize) -> Self {
54        SharedMemoryTransport {
55            mux: RpcMuxClient::new(lease, num_slots, slot_payload_size),
56        }
57    }
58}
59
60impl RpcTransport for SharedMemoryTransport<'_> {
61    fn call(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
62        self.mux.call(method_id, payload)
63    }
64}
65
66// ---------------------------------------------------------------------------
67// QuicTransport
68// ---------------------------------------------------------------------------
69
70/// QUIC-based RPC transport for non-co-located services.
71///
72/// Encodes requests as a simple framed message on a QUIC bidi stream:
73///
74/// ```text
75/// [method_id: u32 LE (4B)] [request_id: u64 LE (8B)]
76/// [payload_len: u32 LE (4B)] [payload: [u8; payload_len]]
77/// ```
78///
79/// The response is:
80///
81/// ```text
82/// [request_id: u64 LE (8B)] [status: u8 (1B)]
83/// [payload_len: u32 LE (4B)] [payload: [u8; payload_len]]
84/// ```
85///
86/// This is a stub transport that delegates to a user-provided send/receive
87/// callback, since the actual QUIC connection setup varies by platform.
88#[allow(clippy::type_complexity)]
89pub struct QuicTransport {
90    /// Callback: takes (method_id, request_id, payload) -> response bytes.
91    sender: Box<dyn Fn(u32, u64, &[u8]) -> Result<Vec<u8>> + Send + Sync>,
92    next_request_id: AtomicU64,
93}
94
95impl QuicTransport {
96    /// Create a QUIC transport with a send/receive callback.
97    ///
98    /// The callback receives `(method_id, request_id, payload)` and must
99    /// return the serialized response bytes or an error.
100    #[allow(clippy::type_complexity)]
101    pub fn new(sender: Box<dyn Fn(u32, u64, &[u8]) -> Result<Vec<u8>> + Send + Sync>) -> Self {
102        QuicTransport {
103            sender,
104            next_request_id: AtomicU64::new(1),
105        }
106    }
107}
108
109impl RpcTransport for QuicTransport {
110    fn call(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
111        let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
112        (self.sender)(method_id, request_id, payload)
113    }
114}
115
116// ---------------------------------------------------------------------------
117// AutoTransport
118// ---------------------------------------------------------------------------
119
120/// Transport that selects shared memory when co-located and QUIC when remote.
121///
122/// Wraps either a [`SharedMemoryTransport`] or a [`QuicTransport`] behind
123/// the [`RpcTransport`] trait.
124pub enum AutoTransport<'a> {
125    SharedMemory(SharedMemoryTransport<'a>),
126    Quic(QuicTransport),
127}
128
129impl RpcTransport for AutoTransport<'_> {
130    fn call(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
131        match self {
132            AutoTransport::SharedMemory(t) => t.call(method_id, payload),
133            AutoTransport::Quic(t) => t.call(method_id, payload),
134        }
135    }
136}
137
138// ---------------------------------------------------------------------------
139// RpcHandler adapter for server traits
140// ---------------------------------------------------------------------------
141
142/// Adapter that bridges a generated `{Service}Server` dispatch function to
143/// the [`crate::RpcHandler`] trait used by [`crate::RpcServer`] and
144/// [`crate::mux::RpcMuxServer`].
145///
146/// Use this to plug a proc-macro-generated service into the mux server:
147///
148/// ```ignore
149/// let adapter = ServiceHandlerAdapter::new(my_impl);
150/// let server = RpcMuxServer::new(&lease, 8, 4096);
151/// server.poll_once(&adapter)?;
152/// ```
153pub struct ServiceHandlerAdapter<F> {
154    dispatch: F,
155}
156
157impl<F> ServiceHandlerAdapter<F>
158where
159    F: Fn(u32, &[u8]) -> Result<Vec<u8>>,
160{
161    pub fn new(dispatch: F) -> Self {
162        ServiceHandlerAdapter { dispatch }
163    }
164}
165
166impl<F> crate::RpcHandler for ServiceHandlerAdapter<F>
167where
168    F: Fn(u32, &[u8]) -> Result<Vec<u8>>,
169{
170    fn handle(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
171        (self.dispatch)(method_id, payload)
172    }
173}
174
175// ---------------------------------------------------------------------------
176// Tests
177// ---------------------------------------------------------------------------
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use grafos_std::error::FabricError;
183    use grafos_std::host;
184    use grafos_std::mem::MemBuilder;
185
186    #[test]
187    fn shared_memory_transport_call() {
188        host::reset_mock();
189        host::mock_set_fbmu_arena_size(65536);
190
191        let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
192        let _default = SharedMemoryTransport::new(&lease);
193
194        // Write a request — no server running, so this will time out.
195        // Use a short poll limit via the mux client.
196        let transport = SharedMemoryTransport {
197            mux: RpcMuxClient::new(&lease, 8, 4096).with_max_poll_iterations(5),
198        };
199        let result = transport.call(0, b"hello");
200        assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
201    }
202
203    #[test]
204    fn quic_transport_call() {
205        let transport = QuicTransport::new(Box::new(|method_id, _req_id, payload| {
206            // Echo transport: prepend method_id to payload.
207            let mut resp = method_id.to_le_bytes().to_vec();
208            resp.extend_from_slice(payload);
209            Ok(resp)
210        }));
211
212        let resp = transport.call(42, b"test").unwrap();
213        assert_eq!(&resp[..4], &42u32.to_le_bytes());
214        assert_eq!(&resp[4..], b"test");
215    }
216
217    #[test]
218    fn quic_transport_increments_request_id() {
219        use alloc::sync::Arc;
220
221        let seen_ids = Arc::new(std::sync::Mutex::new(Vec::new()));
222        let ids = seen_ids.clone();
223        let transport = QuicTransport::new(Box::new(move |_mid, req_id, _payload| {
224            ids.lock().unwrap().push(req_id);
225            Ok(Vec::new())
226        }));
227
228        transport.call(0, b"a").unwrap();
229        transport.call(0, b"b").unwrap();
230        transport.call(0, b"c").unwrap();
231
232        let ids = seen_ids.lock().unwrap();
233        assert_eq!(*ids, vec![1, 2, 3]);
234    }
235
236    #[test]
237    fn quic_transport_error_propagation() {
238        let transport = QuicTransport::new(Box::new(|_mid, _req_id, _payload| {
239            Err(FabricError::IoError(-500))
240        }));
241
242        let result = transport.call(0, b"fail");
243        assert_eq!(result.unwrap_err(), FabricError::IoError(-500));
244    }
245
246    #[test]
247    fn auto_transport_quic_variant() {
248        let quic = QuicTransport::new(Box::new(|_mid, _req_id, payload| Ok(payload.to_vec())));
249        let auto = AutoTransport::Quic(quic);
250
251        let resp = auto.call(0, b"echo").unwrap();
252        assert_eq!(resp, b"echo");
253    }
254
255    #[test]
256    fn service_handler_adapter() {
257        use crate::RpcHandler;
258
259        let adapter = ServiceHandlerAdapter::new(|method_id: u32, payload: &[u8]| {
260            if method_id == 0 {
261                Ok(payload.to_vec())
262            } else {
263                Err(FabricError::Unsupported)
264            }
265        });
266
267        let resp = adapter.handle(0, b"hello").unwrap();
268        assert_eq!(resp, b"hello");
269
270        let err = adapter.handle(99, b"fail").unwrap_err();
271        assert_eq!(err, FabricError::Unsupported);
272    }
273}