grafos_std/
cpu.rs

1//! CPU tasklet resource module.
2//!
3//! Provides typed access to fabric CPU resources via TASKLET_SUBMIT host
4//! functions. Programs submit WASM binaries for execution on remote fabric
5//! nodes with configurable fuel limits.
6//!
7//! The underlying TASKLET_SUBMIT control op (0x0500) is implemented in
8//! fabricbiosd. On `wasm32` targets, the host functions link to real imports
9//! (`fabricbios_tasklet_v0`). On native targets, mock implementations
10//! allow testing without a runtime.
11//!
12//! # Example
13//!
14//! ```rust
15//! use grafos_std::cpu::{CpuBuilder, FabricCpu};
16//!
17//! # grafos_std::host::reset_mock();
18//! # grafos_std::host::mock_set_tasklet_result(0, b"hello");
19//! let lease = CpuBuilder::new().single_core().lease_secs(60).acquire()?;
20//! let result = lease.cpu().submit(&[0x00, 0x61, 0x73, 0x6d])
21//!     .fuel(500_000)
22//!     .input(b"test input")
23//!     .launch()?;
24//! assert_eq!(result.exit_code, 0);
25//! assert_eq!(&result.output, b"hello");
26//! # Ok::<(), grafos_std::FabricError>(())
27//! ```
28
29extern crate alloc;
30use alloc::vec;
31use alloc::vec::Vec;
32
33use crate::error::Result;
34use crate::host;
35use crate::lease::{self, LeaseInfo, LeaseStatus, SharedLeaseState};
36
37const LEASE_TAG_CPU: u8 = 0x04;
38
39// Re-export the shared-memory tasklet SDK surface so users can write
40// `use grafos_std::cpu::{CoordinatorCtx, WorkerCtx, ...}`. On host targets
41// with `std`, this resolves to the host mock (which includes
42// `SharedTaskletBuilder`). On wasm32, it resolves to the real guest-side
43// implementation (which exposes `run_shared_memory_tasklet` instead of
44// the builder).
45pub use crate::cpu_shared::partition;
46
47#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
48pub use crate::cpu_shared::{
49    CoordinatorCtx, SharedRegion, SharedTaskletBuilder, SharedTaskletResult, TaskletError,
50    WorkerCtx, WorkerScratch,
51};
52
53#[cfg(target_arch = "wasm32")]
54pub use crate::cpu_shared::{
55    run_shared_memory_tasklet, Cancelled, CoordinatorCtx, SharedRegion, SharedState,
56    SharedTaskletResult, TaskletError, WorkerCtx, WorkerScratch,
57};
58
59/// Result of a completed tasklet execution.
60///
61/// Returned by [`TaskletBuilder::launch`] upon successful completion of a
62/// CPU tasklet.
63#[derive(Debug, Clone)]
64pub struct TaskletResult {
65    /// Exit code of the tasklet (0 typically indicates success).
66    pub exit_code: u64,
67    /// Captured output from the tasklet, if any.
68    pub output: Vec<u8>,
69}
70
71/// Safe handle to a fabric CPU resource for tasklet dispatch.
72///
73/// Provides the entry point for submitting WASM binaries as CPU tasklets.
74/// The underlying TASKLET_SUBMIT op (0x0500) handles WASM loading,
75/// fuel-limited execution, and output capture.
76///
77/// Current grafOS tasklet runtimes execute one tasklet thread per submission.
78/// Reserving more CPU cores affects available CPU capacity and placement, but
79/// does not implicitly make one tasklet invocation multi-threaded.
80pub struct FabricCpu {
81    lease_state: Option<SharedLeaseState>,
82}
83
84impl FabricCpu {
85    /// Begin building a CPU tasklet submission.
86    ///
87    /// Returns a [`TaskletBuilder`] that can be configured with fuel limits
88    /// and input data before launching.
89    ///
90    /// # Parameters
91    ///
92    /// - `wasm`: The WASM binary to execute as a tasklet.
93    pub fn submit<'a>(&'a self, wasm: &'a [u8]) -> TaskletBuilder<'a> {
94        TaskletBuilder {
95            _cpu: self,
96            wasm,
97            input: &[],
98            fuel: 1_000_000,
99            max_output: 4096,
100        }
101    }
102
103    fn ensure_active(&self) -> Result<()> {
104        if let Some(state) = &self.lease_state {
105            lease::ensure_active(state)
106        } else {
107            Ok(())
108        }
109    }
110}
111
112/// Builder for configuring and launching a CPU tasklet.
113///
114/// Configure the fuel limit and input data, then call
115/// [`launch`](TaskletBuilder::launch) to execute.
116pub struct TaskletBuilder<'a> {
117    _cpu: &'a FabricCpu,
118    wasm: &'a [u8],
119    input: &'a [u8],
120    fuel: u64,
121    max_output: usize,
122}
123
124impl<'a> TaskletBuilder<'a> {
125    /// Set the fuel limit for the tasklet.
126    ///
127    /// Fuel is consumed by WASM instructions. The tasklet will be
128    /// terminated when fuel runs out. Default is 1,000,000.
129    pub fn fuel(mut self, n: u64) -> Self {
130        self.fuel = n;
131        self
132    }
133
134    /// Set the input data for the tasklet.
135    pub fn input(mut self, data: &'a [u8]) -> Self {
136        self.input = data;
137        self
138    }
139
140    /// Set the maximum output buffer size.
141    ///
142    /// Default is 4096 bytes. Increase if the tasklet produces more output.
143    pub fn max_output(mut self, n: usize) -> Self {
144        self.max_output = n;
145        self
146    }
147
148    /// Launch the tasklet and wait for completion.
149    ///
150    /// # Errors
151    ///
152    /// Returns [`crate::error::FabricError::Disconnected`] if the host connection fails,
153    /// or another [`crate::error::FabricError`] variant based on the host status code.
154    pub fn launch(self) -> Result<TaskletResult> {
155        self._cpu.ensure_active()?;
156        #[cfg(feature = "observe")]
157        let start = std::time::Instant::now();
158        let mut output_buf = vec![0u8; self.max_output];
159        let result = host::tasklet_submit(self.wasm, self.input, self.fuel, &mut output_buf);
160        #[cfg(feature = "observe")]
161        match &result {
162            Ok(_) => {
163                let duration_us = start.elapsed().as_micros() as u64;
164                crate::observe_hooks::on_tasklet_submit();
165                crate::observe_hooks::on_op_completed(
166                    grafos_observe::OpType::TaskletSubmit,
167                    duration_us,
168                    0,
169                );
170            }
171            Err(e) => {
172                crate::observe_hooks::on_op_error();
173                crate::observe_hooks::on_op_failed(
174                    grafos_observe::OpType::TaskletSubmit,
175                    &alloc::format!("{e:?}"),
176                );
177            }
178        }
179        let (exit_code, output_len) = result?;
180        output_buf.truncate(output_len);
181        Ok(TaskletResult {
182            exit_code,
183            output: output_buf,
184        })
185    }
186}
187
188/// A CPU lease that auto-frees on drop.
189///
190/// Wraps a [`FabricCpu`] handle with RAII lifecycle management.
191/// Created via [`CpuBuilder::acquire`].
192pub struct CpuLease {
193    state: SharedLeaseState,
194    cpu: FabricCpu,
195}
196
197impl CpuLease {
198    /// Access the underlying [`FabricCpu`] handle.
199    pub fn cpu(&self) -> &FabricCpu {
200        &self.cpu
201    }
202
203    /// Lease metadata snapshot (id, creation time, expiry, and status).
204    pub fn info(&self) -> LeaseInfo {
205        lease::info(&self.state)
206    }
207
208    /// Unique lease identifier.
209    pub fn lease_id(&self) -> u128 {
210        lease::lease_id(&self.state)
211    }
212
213    /// Lease creation timestamp (unix seconds).
214    pub fn created_at_unix_secs(&self) -> u64 {
215        lease::created_at_unix_secs(&self.state)
216    }
217
218    /// Lease expiry timestamp (unix seconds).
219    pub fn expires_at_unix_secs(&self) -> u64 {
220        lease::expires_at_unix_secs(&self.state)
221    }
222
223    /// Current lease status.
224    pub fn status(&self) -> LeaseStatus {
225        lease::status(&self.state)
226    }
227
228    /// Renew the lease TTL by `duration_secs`.
229    pub fn renew(&self, duration_secs: u64) -> Result<()> {
230        lease::renew(&self.state, duration_secs)
231    }
232
233    /// Explicitly revoke/free this lease.
234    pub fn free(&self) {
235        lease::free(&self.state);
236    }
237}
238
239impl Drop for CpuLease {
240    fn drop(&mut self) {
241        #[cfg(feature = "observe")]
242        crate::observe_hooks::on_lease_dropped(LEASE_TAG_CPU, lease::lease_id(&self.state));
243        lease::free(&self.state);
244    }
245}
246
247/// Builder for acquiring a fabric CPU lease.
248///
249/// Allows specifying reserved CPU capacity and lease duration.
250///
251/// `cores(n)` reserves CPU capacity for the lease. In current tasklet
252/// runtimes, execution width is still one tasklet thread per invocation unless
253/// a future profile version defines otherwise.
254///
255/// # Examples
256///
257/// ```rust
258/// use grafos_std::cpu::CpuBuilder;
259///
260/// # grafos_std::host::reset_mock();
261/// let lease = CpuBuilder::new().single_core().lease_secs(120).acquire()?;
262/// // lease.cpu() is available for submit() calls
263/// # Ok::<(), grafos_std::FabricError>(())
264/// ```
265pub struct CpuBuilder {
266    _cores: u32,
267    _lease_secs: u32,
268    _isolation: Option<CpuIsolationClass>,
269    _affinities: Vec<crate::affinity::Affinity>,
270}
271
272/// Per-lease CPU isolation class.
273///
274/// Mirrors `fabricbios_core::lease_cpu_isolation::CpuIsolationClass` at the
275/// SDK layer.  See `docs/spec/cpu-isolation-wire-format.md` for the wire
276/// format and `docs/spec/resource-isolation-and-exclusivity.md` ยง4.2 for
277/// the semantics.
278///
279/// When passed to [`CpuBuilder::isolation`], the runtime emits
280/// `TLV_LEASE_CPU_ISOLATION` (0x0902) on the `LEASE_ALLOC` request.
281/// Unsupported classes fail closed โ€” the node rejects the lease.
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283pub enum CpuIsolationClass {
284    /// Default โ€” no pinning beyond cgroup quota.
285    BestEffort,
286    /// Lease owns a full core; no SMT sibling sharing.
287    WholeCore,
288    /// `WholeCore` plus topology/NUMA constraints.
289    StrictIsolated,
290}
291
292impl CpuBuilder {
293    /// Create a new builder with defaults (1 core, 300s lease, no isolation
294    /// preference โ€” daemon default applies).
295    pub fn new() -> Self {
296        CpuBuilder {
297            _cores: 1,
298            _lease_secs: 300,
299            _isolation: None,
300            _affinities: Vec::new(),
301        }
302    }
303
304    /// Convenience for the common case: reserve one CPU core for a
305    /// single-threaded tasklet.
306    pub fn single_core(mut self) -> Self {
307        self._cores = 1;
308        self
309    }
310
311    /// Set the number of CPU cores requested.
312    ///
313    /// This reserves CPU capacity for the lease. It does not implicitly set
314    /// tasklet execution width or create multiple tasklet worker threads.
315    pub fn cores(mut self, n: u32) -> Self {
316        self._cores = n;
317        self
318    }
319
320    /// Request a specific CPU isolation class for this lease.
321    ///
322    /// When set, the daemon emits `TLV_LEASE_CPU_ISOLATION` (0x0902) on the
323    /// `LEASE_ALLOC` request.  When omitted, the daemon's
324    /// `--cpu-isolation-policy` default applies.
325    ///
326    /// # Examples
327    ///
328    /// ```rust
329    /// use grafos_std::cpu::{CpuBuilder, CpuIsolationClass};
330    ///
331    /// # grafos_std::host::reset_mock();
332    /// let lease = CpuBuilder::new()
333    ///     .single_core()
334    ///     .isolation(CpuIsolationClass::WholeCore)
335    ///     .lease_secs(60)
336    ///     .acquire()?;
337    /// # Ok::<(), grafos_std::FabricError>(())
338    /// ```
339    pub fn isolation(mut self, class: CpuIsolationClass) -> Self {
340        self._isolation = Some(class);
341        self
342    }
343
344    /// Add an affinity constraint (toward the target).
345    ///
346    /// Multiple constraints may be added. Required constraints are hard
347    /// filters; preferred constraints boost placement scoring.
348    ///
349    /// # Examples
350    ///
351    /// ```rust
352    /// use grafos_std::cpu::CpuBuilder;
353    /// use grafos_std::affinity::{Affinity, Strength, Target};
354    ///
355    /// # grafos_std::host::reset_mock();
356    /// let lease = CpuBuilder::new()
357    ///     .single_core()
358    ///     .affinity(Affinity::new(Strength::Preferred, Target::node(42)))
359    ///     .lease_secs(60)
360    ///     .acquire()?;
361    /// # Ok::<(), grafos_std::FabricError>(())
362    /// ```
363    pub fn affinity(mut self, a: crate::affinity::Affinity) -> Self {
364        self._affinities.push(a);
365        self
366    }
367
368    /// Add an anti-affinity constraint (away from the target).
369    ///
370    /// Shorthand for `.affinity(Affinity::anti(strength, target))`.
371    pub fn anti_affinity(
372        mut self,
373        strength: crate::affinity::Strength,
374        target: crate::affinity::Target,
375    ) -> Self {
376        self._affinities
377            .push(crate::affinity::Affinity::anti(strength, target));
378        self
379    }
380
381    /// Set the lease duration in seconds.
382    pub fn lease_secs(mut self, t: u32) -> Self {
383        self._lease_secs = t;
384        self
385    }
386
387    /// Acquire a CPU lease.
388    ///
389    /// # Errors
390    ///
391    /// Returns [`crate::error::FabricError::CapacityExceeded`] or
392    /// [`crate::error::FabricError::Disconnected`] if the host cannot satisfy the request.
393    pub fn acquire(self) -> Result<CpuLease> {
394        let state = lease::new_shared_lease(LEASE_TAG_CPU, self._lease_secs as u64);
395        let lease = CpuLease {
396            state: state.clone(),
397            cpu: FabricCpu {
398                lease_state: Some(state),
399            },
400        };
401        #[cfg(feature = "observe")]
402        crate::observe_hooks::on_lease_acquired(LEASE_TAG_CPU, lease.lease_id(), "local", 0);
403        Ok(lease)
404    }
405}
406
407impl Default for CpuBuilder {
408    fn default() -> Self {
409        Self::new()
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416    use crate::error::FabricError;
417    use crate::host;
418
419    #[test]
420    fn cpu_submit_succeeds_with_mock() {
421        host::reset_mock();
422        host::mock_set_tasklet_result(0, b"output data");
423
424        let lease = CpuBuilder::new()
425            .single_core()
426            .lease_secs(60)
427            .acquire()
428            .expect("acquire");
429        let result = lease
430            .cpu()
431            .submit(&[0x00, 0x61, 0x73, 0x6d])
432            .fuel(500_000)
433            .input(b"test")
434            .launch()
435            .expect("launch");
436        assert_eq!(result.exit_code, 0);
437        assert_eq!(&result.output, b"output data");
438    }
439
440    #[test]
441    fn cpu_submit_returns_exit_code() {
442        host::reset_mock();
443        host::mock_set_tasklet_result(42, &[]);
444
445        let lease = CpuBuilder::new().acquire().expect("acquire");
446        let result = lease.cpu().submit(&[0x00, 0x61]).launch().expect("launch");
447        assert_eq!(result.exit_code, 42);
448        assert!(result.output.is_empty());
449    }
450
451    #[test]
452    fn cpu_submit_error_propagates() {
453        host::reset_mock();
454        host::mock_set_tasklet_submit_error(Some(-1));
455
456        let lease = CpuBuilder::new().acquire().expect("acquire");
457        let result = lease.cpu().submit(&[0x00, 0x61]).launch();
458        assert_eq!(result.unwrap_err(), FabricError::Disconnected);
459
460        host::mock_set_tasklet_submit_error(None);
461    }
462
463    #[test]
464    fn cpu_builder_acquires() {
465        host::reset_mock();
466        let lease = CpuBuilder::new().cores(4).lease_secs(120).acquire();
467        assert!(lease.is_ok());
468    }
469
470    #[test]
471    fn cpu_builder_single_core_sets_one_core() {
472        let builder = CpuBuilder::new().cores(4).single_core().lease_secs(120);
473        assert_eq!(builder._cores, 1);
474        assert_eq!(builder._lease_secs, 120);
475    }
476
477    #[test]
478    fn cpu_lease_expiry_blocks_submit() {
479        host::reset_mock();
480        host::mock_set_tasklet_result(0, b"ok");
481        host::mock_set_unix_time_secs(4_000);
482
483        let lease = CpuBuilder::new().lease_secs(5).acquire().expect("acquire");
484        host::mock_advance_time_secs(6);
485        let result = lease.cpu().submit(&[0x00, 0x61]).launch();
486        assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
487    }
488}