grafos_std/cpu.rs
1//! CPU tasklet resource module.
2//!
3//! Provides typed access to fabric CPU resources via TASKLET_SUBMIT host
4//! functions. Programs submit WASM binaries for execution on remote fabric
5//! nodes with configurable fuel limits.
6//!
7//! The underlying TASKLET_SUBMIT control op (0x0500) is implemented in
8//! fabricbiosd. On `wasm32` targets, the host functions link to real imports
9//! (`fabricbios_tasklet_v0`). On native targets, mock implementations
10//! allow testing without a runtime.
11//!
12//! # Example
13//!
14//! ```rust
15//! use grafos_std::cpu::{CpuBuilder, FabricCpu};
16//!
17//! # grafos_std::host::reset_mock();
18//! # grafos_std::host::mock_set_tasklet_result(0, b"hello");
19//! let lease = CpuBuilder::new().single_core().lease_secs(60).acquire()?;
20//! let result = lease.cpu().submit(&[0x00, 0x61, 0x73, 0x6d])
21//! .fuel(500_000)
22//! .input(b"test input")
23//! .launch()?;
24//! assert_eq!(result.exit_code, 0);
25//! assert_eq!(&result.output, b"hello");
26//! # Ok::<(), grafos_std::FabricError>(())
27//! ```
28
29extern crate alloc;
30use alloc::vec;
31use alloc::vec::Vec;
32
33use crate::error::Result;
34use crate::host;
35use crate::lease::{self, LeaseInfo, LeaseStatus, SharedLeaseState};
36
37const LEASE_TAG_CPU: u8 = 0x04;
38
39// Re-export the shared-memory tasklet SDK surface so users can write
40// `use grafos_std::cpu::{CoordinatorCtx, WorkerCtx, ...}`. On host targets
41// with `std`, this resolves to the host mock (which includes
42// `SharedTaskletBuilder`). On wasm32, it resolves to the real guest-side
43// implementation (which exposes `run_shared_memory_tasklet` instead of
44// the builder).
45pub use crate::cpu_shared::partition;
46
47#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
48pub use crate::cpu_shared::{
49 CoordinatorCtx, SharedRegion, SharedTaskletBuilder, SharedTaskletResult, TaskletError,
50 WorkerCtx, WorkerScratch,
51};
52
53#[cfg(target_arch = "wasm32")]
54pub use crate::cpu_shared::{
55 run_shared_memory_tasklet, Cancelled, CoordinatorCtx, SharedRegion, SharedState,
56 SharedTaskletResult, TaskletError, WorkerCtx, WorkerScratch,
57};
58
59/// Result of a completed tasklet execution.
60///
61/// Returned by [`TaskletBuilder::launch`] upon successful completion of a
62/// CPU tasklet.
63#[derive(Debug, Clone)]
64pub struct TaskletResult {
65 /// Exit code of the tasklet (0 typically indicates success).
66 pub exit_code: u64,
67 /// Captured output from the tasklet, if any.
68 pub output: Vec<u8>,
69}
70
71/// Safe handle to a fabric CPU resource for tasklet dispatch.
72///
73/// Provides the entry point for submitting WASM binaries as CPU tasklets.
74/// The underlying TASKLET_SUBMIT op (0x0500) handles WASM loading,
75/// fuel-limited execution, and output capture.
76///
77/// Current grafOS tasklet runtimes execute one tasklet thread per submission.
78/// Reserving more CPU cores affects available CPU capacity and placement, but
79/// does not implicitly make one tasklet invocation multi-threaded.
80pub struct FabricCpu {
81 lease_state: Option<SharedLeaseState>,
82}
83
84impl FabricCpu {
85 /// Begin building a CPU tasklet submission.
86 ///
87 /// Returns a [`TaskletBuilder`] that can be configured with fuel limits
88 /// and input data before launching.
89 ///
90 /// # Parameters
91 ///
92 /// - `wasm`: The WASM binary to execute as a tasklet.
93 pub fn submit<'a>(&'a self, wasm: &'a [u8]) -> TaskletBuilder<'a> {
94 TaskletBuilder {
95 _cpu: self,
96 wasm,
97 input: &[],
98 fuel: 1_000_000,
99 max_output: 4096,
100 }
101 }
102
103 fn ensure_active(&self) -> Result<()> {
104 if let Some(state) = &self.lease_state {
105 lease::ensure_active(state)
106 } else {
107 Ok(())
108 }
109 }
110}
111
112/// Builder for configuring and launching a CPU tasklet.
113///
114/// Configure the fuel limit and input data, then call
115/// [`launch`](TaskletBuilder::launch) to execute.
116pub struct TaskletBuilder<'a> {
117 _cpu: &'a FabricCpu,
118 wasm: &'a [u8],
119 input: &'a [u8],
120 fuel: u64,
121 max_output: usize,
122}
123
124impl<'a> TaskletBuilder<'a> {
125 /// Set the fuel limit for the tasklet.
126 ///
127 /// Fuel is consumed by WASM instructions. The tasklet will be
128 /// terminated when fuel runs out. Default is 1,000,000.
129 pub fn fuel(mut self, n: u64) -> Self {
130 self.fuel = n;
131 self
132 }
133
134 /// Set the input data for the tasklet.
135 pub fn input(mut self, data: &'a [u8]) -> Self {
136 self.input = data;
137 self
138 }
139
140 /// Set the maximum output buffer size.
141 ///
142 /// Default is 4096 bytes. Increase if the tasklet produces more output.
143 pub fn max_output(mut self, n: usize) -> Self {
144 self.max_output = n;
145 self
146 }
147
148 /// Launch the tasklet and wait for completion.
149 ///
150 /// # Errors
151 ///
152 /// Returns [`crate::error::FabricError::Disconnected`] if the host connection fails,
153 /// or another [`crate::error::FabricError`] variant based on the host status code.
154 pub fn launch(self) -> Result<TaskletResult> {
155 self._cpu.ensure_active()?;
156 #[cfg(feature = "observe")]
157 let start = std::time::Instant::now();
158 let mut output_buf = vec![0u8; self.max_output];
159 let result = host::tasklet_submit(self.wasm, self.input, self.fuel, &mut output_buf);
160 #[cfg(feature = "observe")]
161 match &result {
162 Ok(_) => {
163 let duration_us = start.elapsed().as_micros() as u64;
164 crate::observe_hooks::on_tasklet_submit();
165 crate::observe_hooks::on_op_completed(
166 grafos_observe::OpType::TaskletSubmit,
167 duration_us,
168 0,
169 );
170 }
171 Err(e) => {
172 crate::observe_hooks::on_op_error();
173 crate::observe_hooks::on_op_failed(
174 grafos_observe::OpType::TaskletSubmit,
175 &alloc::format!("{e:?}"),
176 );
177 }
178 }
179 let (exit_code, output_len) = result?;
180 output_buf.truncate(output_len);
181 Ok(TaskletResult {
182 exit_code,
183 output: output_buf,
184 })
185 }
186}
187
188/// A CPU lease that auto-frees on drop.
189///
190/// Wraps a [`FabricCpu`] handle with RAII lifecycle management.
191/// Created via [`CpuBuilder::acquire`].
192pub struct CpuLease {
193 state: SharedLeaseState,
194 cpu: FabricCpu,
195}
196
197impl CpuLease {
198 /// Access the underlying [`FabricCpu`] handle.
199 pub fn cpu(&self) -> &FabricCpu {
200 &self.cpu
201 }
202
203 /// Lease metadata snapshot (id, creation time, expiry, and status).
204 pub fn info(&self) -> LeaseInfo {
205 lease::info(&self.state)
206 }
207
208 /// Unique lease identifier.
209 pub fn lease_id(&self) -> u128 {
210 lease::lease_id(&self.state)
211 }
212
213 /// Lease creation timestamp (unix seconds).
214 pub fn created_at_unix_secs(&self) -> u64 {
215 lease::created_at_unix_secs(&self.state)
216 }
217
218 /// Lease expiry timestamp (unix seconds).
219 pub fn expires_at_unix_secs(&self) -> u64 {
220 lease::expires_at_unix_secs(&self.state)
221 }
222
223 /// Current lease status.
224 pub fn status(&self) -> LeaseStatus {
225 lease::status(&self.state)
226 }
227
228 /// Renew the lease TTL by `duration_secs`.
229 pub fn renew(&self, duration_secs: u64) -> Result<()> {
230 lease::renew(&self.state, duration_secs)
231 }
232
233 /// Explicitly revoke/free this lease.
234 pub fn free(&self) {
235 lease::free(&self.state);
236 }
237}
238
239impl Drop for CpuLease {
240 fn drop(&mut self) {
241 #[cfg(feature = "observe")]
242 crate::observe_hooks::on_lease_dropped(LEASE_TAG_CPU, lease::lease_id(&self.state));
243 lease::free(&self.state);
244 }
245}
246
247/// Builder for acquiring a fabric CPU lease.
248///
249/// Allows specifying reserved CPU capacity and lease duration.
250///
251/// `cores(n)` reserves CPU capacity for the lease. In current tasklet
252/// runtimes, execution width is still one tasklet thread per invocation unless
253/// a future profile version defines otherwise.
254///
255/// # Examples
256///
257/// ```rust
258/// use grafos_std::cpu::CpuBuilder;
259///
260/// # grafos_std::host::reset_mock();
261/// let lease = CpuBuilder::new().single_core().lease_secs(120).acquire()?;
262/// // lease.cpu() is available for submit() calls
263/// # Ok::<(), grafos_std::FabricError>(())
264/// ```
265pub struct CpuBuilder {
266 _cores: u32,
267 _lease_secs: u32,
268 _isolation: Option<CpuIsolationClass>,
269 _affinities: Vec<crate::affinity::Affinity>,
270}
271
272/// Per-lease CPU isolation class.
273///
274/// Mirrors `fabricbios_core::lease_cpu_isolation::CpuIsolationClass` at the
275/// SDK layer. See `docs/spec/cpu-isolation-wire-format.md` for the wire
276/// format and `docs/spec/resource-isolation-and-exclusivity.md` ยง4.2 for
277/// the semantics.
278///
279/// When passed to [`CpuBuilder::isolation`], the runtime emits
280/// `TLV_LEASE_CPU_ISOLATION` (0x0902) on the `LEASE_ALLOC` request.
281/// Unsupported classes fail closed โ the node rejects the lease.
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283pub enum CpuIsolationClass {
284 /// Default โ no pinning beyond cgroup quota.
285 BestEffort,
286 /// Lease owns a full core; no SMT sibling sharing.
287 WholeCore,
288 /// `WholeCore` plus topology/NUMA constraints.
289 StrictIsolated,
290}
291
292impl CpuBuilder {
293 /// Create a new builder with defaults (1 core, 300s lease, no isolation
294 /// preference โ daemon default applies).
295 pub fn new() -> Self {
296 CpuBuilder {
297 _cores: 1,
298 _lease_secs: 300,
299 _isolation: None,
300 _affinities: Vec::new(),
301 }
302 }
303
304 /// Convenience for the common case: reserve one CPU core for a
305 /// single-threaded tasklet.
306 pub fn single_core(mut self) -> Self {
307 self._cores = 1;
308 self
309 }
310
311 /// Set the number of CPU cores requested.
312 ///
313 /// This reserves CPU capacity for the lease. It does not implicitly set
314 /// tasklet execution width or create multiple tasklet worker threads.
315 pub fn cores(mut self, n: u32) -> Self {
316 self._cores = n;
317 self
318 }
319
320 /// Request a specific CPU isolation class for this lease.
321 ///
322 /// When set, the daemon emits `TLV_LEASE_CPU_ISOLATION` (0x0902) on the
323 /// `LEASE_ALLOC` request. When omitted, the daemon's
324 /// `--cpu-isolation-policy` default applies.
325 ///
326 /// # Examples
327 ///
328 /// ```rust
329 /// use grafos_std::cpu::{CpuBuilder, CpuIsolationClass};
330 ///
331 /// # grafos_std::host::reset_mock();
332 /// let lease = CpuBuilder::new()
333 /// .single_core()
334 /// .isolation(CpuIsolationClass::WholeCore)
335 /// .lease_secs(60)
336 /// .acquire()?;
337 /// # Ok::<(), grafos_std::FabricError>(())
338 /// ```
339 pub fn isolation(mut self, class: CpuIsolationClass) -> Self {
340 self._isolation = Some(class);
341 self
342 }
343
344 /// Add an affinity constraint (toward the target).
345 ///
346 /// Multiple constraints may be added. Required constraints are hard
347 /// filters; preferred constraints boost placement scoring.
348 ///
349 /// # Examples
350 ///
351 /// ```rust
352 /// use grafos_std::cpu::CpuBuilder;
353 /// use grafos_std::affinity::{Affinity, Strength, Target};
354 ///
355 /// # grafos_std::host::reset_mock();
356 /// let lease = CpuBuilder::new()
357 /// .single_core()
358 /// .affinity(Affinity::new(Strength::Preferred, Target::node(42)))
359 /// .lease_secs(60)
360 /// .acquire()?;
361 /// # Ok::<(), grafos_std::FabricError>(())
362 /// ```
363 pub fn affinity(mut self, a: crate::affinity::Affinity) -> Self {
364 self._affinities.push(a);
365 self
366 }
367
368 /// Add an anti-affinity constraint (away from the target).
369 ///
370 /// Shorthand for `.affinity(Affinity::anti(strength, target))`.
371 pub fn anti_affinity(
372 mut self,
373 strength: crate::affinity::Strength,
374 target: crate::affinity::Target,
375 ) -> Self {
376 self._affinities
377 .push(crate::affinity::Affinity::anti(strength, target));
378 self
379 }
380
381 /// Set the lease duration in seconds.
382 pub fn lease_secs(mut self, t: u32) -> Self {
383 self._lease_secs = t;
384 self
385 }
386
387 /// Acquire a CPU lease.
388 ///
389 /// # Errors
390 ///
391 /// Returns [`crate::error::FabricError::CapacityExceeded`] or
392 /// [`crate::error::FabricError::Disconnected`] if the host cannot satisfy the request.
393 pub fn acquire(self) -> Result<CpuLease> {
394 let state = lease::new_shared_lease(LEASE_TAG_CPU, self._lease_secs as u64);
395 let lease = CpuLease {
396 state: state.clone(),
397 cpu: FabricCpu {
398 lease_state: Some(state),
399 },
400 };
401 #[cfg(feature = "observe")]
402 crate::observe_hooks::on_lease_acquired(LEASE_TAG_CPU, lease.lease_id(), "local", 0);
403 Ok(lease)
404 }
405}
406
407impl Default for CpuBuilder {
408 fn default() -> Self {
409 Self::new()
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use crate::error::FabricError;
417 use crate::host;
418
419 #[test]
420 fn cpu_submit_succeeds_with_mock() {
421 host::reset_mock();
422 host::mock_set_tasklet_result(0, b"output data");
423
424 let lease = CpuBuilder::new()
425 .single_core()
426 .lease_secs(60)
427 .acquire()
428 .expect("acquire");
429 let result = lease
430 .cpu()
431 .submit(&[0x00, 0x61, 0x73, 0x6d])
432 .fuel(500_000)
433 .input(b"test")
434 .launch()
435 .expect("launch");
436 assert_eq!(result.exit_code, 0);
437 assert_eq!(&result.output, b"output data");
438 }
439
440 #[test]
441 fn cpu_submit_returns_exit_code() {
442 host::reset_mock();
443 host::mock_set_tasklet_result(42, &[]);
444
445 let lease = CpuBuilder::new().acquire().expect("acquire");
446 let result = lease.cpu().submit(&[0x00, 0x61]).launch().expect("launch");
447 assert_eq!(result.exit_code, 42);
448 assert!(result.output.is_empty());
449 }
450
451 #[test]
452 fn cpu_submit_error_propagates() {
453 host::reset_mock();
454 host::mock_set_tasklet_submit_error(Some(-1));
455
456 let lease = CpuBuilder::new().acquire().expect("acquire");
457 let result = lease.cpu().submit(&[0x00, 0x61]).launch();
458 assert_eq!(result.unwrap_err(), FabricError::Disconnected);
459
460 host::mock_set_tasklet_submit_error(None);
461 }
462
463 #[test]
464 fn cpu_builder_acquires() {
465 host::reset_mock();
466 let lease = CpuBuilder::new().cores(4).lease_secs(120).acquire();
467 assert!(lease.is_ok());
468 }
469
470 #[test]
471 fn cpu_builder_single_core_sets_one_core() {
472 let builder = CpuBuilder::new().cores(4).single_core().lease_secs(120);
473 assert_eq!(builder._cores, 1);
474 assert_eq!(builder._lease_secs, 120);
475 }
476
477 #[test]
478 fn cpu_lease_expiry_blocks_submit() {
479 host::reset_mock();
480 host::mock_set_tasklet_result(0, b"ok");
481 host::mock_set_unix_time_secs(4_000);
482
483 let lease = CpuBuilder::new().lease_secs(5).acquire().expect("acquire");
484 host::mock_advance_time_secs(6);
485 let result = lease.cpu().submit(&[0x00, 0x61]).launch();
486 assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
487 }
488}