grafos_std/cpu_shared/host_mock.rs
1//! Shared-memory tasklet SDK surface — host mock (Phase 48.3 P3, Phase 48.14).
2//!
3//! This module exposes the *programmer-facing* types for the shared-memory
4//! tasklet execution mode: a builder, a coordinator context, a worker
5//! context, typed views over the shared region and per-worker scratch, and
6//! partition helpers.
7//!
8//! # Source portability (Phase 48.14)
9//!
10//! The host mock and the wasm32 guest ([`super::guest`]) deliberately do
11//! NOT present a single source-portable surface. They present **three
12//! levels** of parity, documented in detail in
13//! [`docs/grafos/shared-memory-tasklet-programming-model.md`][portability]:
14//!
15//! 1. **Behavioral portability** is the primary promise. The fidelity
16//! test `shared_memory_host_wasm_fidelity_e2e` guarantees that the
17//! same logical workload produces byte-identical output on both
18//! targets.
19//! 2. **Source portability** is a narrower documented subset. Methods on
20//! the safe-list compile and behave identically on both targets. The
21//! cfg-list and behavior-divergent sub-list do NOT.
22//! 3. **`#[cfg]` is the honest tool outside that subset.** Methods
23//! tagged with `**Target-specific.**` below require per-target source
24//! when used portably.
25//!
26//! Methods carrying the `**Target-specific.**` rustdoc tag are on the
27//! cfg-list. Methods carrying `**Behavior diverges across targets.**`
28//! compile on both targets but mean different things (see
29//! [`CoordinatorCtx::barrier`] and [`WorkerCtx::barrier`]).
30//!
31//! [portability]: https://example.invalid
32//! (See `docs/grafos/shared-memory-tasklet-programming-model.md#source-portability-between-host-mock-and-wasm32-guest`)
33//!
34//! ## Coordinator-lane programming model
35//!
36//! Phase 48.3 — the host mock implements the same coordinator-lane model
37//! as the wasm32 guest path in [`super::guest`]: worker 0 is the
38//! coordinator lane and runs only the coordinator closure;
39//! [`CoordinatorCtx::parallel_for_workers`] spawns the worker body on
40//! data worker lanes `1..worker_count()` (skipping worker 0). Source
41//! compiled for either target produces identical observable behavior
42//! under the `SharedMemory` execution mode, so a programmer can
43//! prototype against the host mock and deploy unchanged on a real
44//! fabricBIOS runtime. The `data_worker_index() / data_worker_count() /
45//! data_scratch(i)` helpers expose the data-plane lane space for
46//! partitioning, while the raw `worker_index() / worker_count() /
47//! scratch(i)` accessors keep "raw wasm worker space" semantics for
48//! advanced cases.
49//!
50//! ## What this is
51//!
52//! - The types developers will use to write a shared-memory tasklet.
53//! - A host-mock launch path so the SDK is unit-testable without a runtime.
54//! - The companion ABI declaration lives in [`crate::grafos_worker_v0`].
55//!
56//! ## What this is NOT
57//!
58//! - This is not the runtime. The wasmtime/wasmi-backed runtime that
59//! actually executes shared-memory tasklets is delivered as part of the
60//! P1 track. Here, `launch()` runs the coordinator and worker closures
61//! directly on the host using a `std::thread::scope` plus a real
62//! `std::sync::Barrier`, purely so the SDK abstractions can be exercised
63//! in unit tests.
64//! - The programmer-facing API never mentions wasmi, wasmtime, or "threads".
65//! The abstraction is *coordinator + workers* with explicit shared and
66//! per-worker state.
67//!
68//! ## Programming model
69//!
70//! ```no_run
71//! use grafos_std::cpu::{CpuBuilder, CoordinatorCtx, WorkerCtx};
72//! use std::sync::atomic::{AtomicU32, Ordering};
73//!
74//! #[derive(Default)]
75//! struct Shared {
76//! counter: AtomicU32,
77//! }
78//! #[derive(Default, Clone)]
79//! struct Scratch {
80//! local: u32,
81//! }
82//!
83//! # grafos_std::host::reset_mock();
84//! let lease = CpuBuilder::new().cores(4).acquire().unwrap();
85//! let _ = lease.cpu().shared_memory_tasklet::<Shared, Scratch>()
86//! .cores(4)
87//! .workers(4)
88//! .shared_bytes(1024)
89//! .scratch_bytes_per_worker(64)
90//! .fuel(1_000_000)
91//! .launch(|coord: &mut CoordinatorCtx<Shared, Scratch>| {
92//! // Coordinator: load input, orchestrate phases, emit output.
93//! coord.parallel_for_workers(|w| {
94//! w.scratch_mut().local = w.worker_index() as u32;
95//! w.shared().counter.fetch_add(1, Ordering::SeqCst);
96//! }).unwrap();
97//! let n = coord.shared().counter.load(Ordering::SeqCst);
98//! coord.set_output(&n.to_le_bytes());
99//! Ok(())
100//! });
101//! ```
102
103extern crate alloc;
104use alloc::sync::Arc;
105use alloc::vec::Vec;
106use core::marker::PhantomData;
107use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
108
109use crate::cpu::FabricCpu;
110use crate::error::FabricError;
111use crate::grafos_worker_v0::mock::{self, with_worker_slot, WorkerSlot};
112
113/// Phase 48.13 W2b — minimum precharge slice per worker. Mirrors the
114/// wasmtime runtime constant in the shared-memory dispatcher so the host
115/// mock and the real runtime agree on the precharge math.
116const MIN_INITIAL_SLICE: u64 = 4096;
117
118/// Result of a completed shared-memory tasklet.
119#[derive(Debug, Clone)]
120pub struct SharedTaskletResult {
121 /// Exit code (0 = success).
122 pub exit_code: u64,
123 /// Output bytes emitted by the coordinator.
124 pub output: Vec<u8>,
125}
126
127/// Errors produced by the shared-memory tasklet SDK surface.
128///
129/// These are SDK-side validation errors, distinct from the underlying
130/// transport errors in [`FabricError`]. A successful submission may still
131/// surface a [`FabricError`] from the runtime.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub enum TaskletError {
134 /// `max_threads > num_cores`, or either is zero.
135 InvalidWorkerCount,
136 /// Memory envelope `shared + max_threads * scratch_per_worker` overflowed
137 /// or exceeded the configured tasklet linear-memory budget.
138 MemoryEnvelopeInvalid,
139 /// A required builder field was not set.
140 Missing(&'static str),
141 /// The whole tasklet was cancelled (revoke / expiry / explicit cancel).
142 Cancelled,
143 /// The lease-wide shared fuel pool is exhausted (Phase 48.14 — folds
144 /// Phase 48.3 follow-up #35). Produced by `?`-propagation from
145 /// [`CoordinatorCtx::fuel_checkpoint`] / [`WorkerCtx::fuel_checkpoint`]
146 /// via the [`From<FuelExhausted>`] impl below, so programmer source
147 /// can write `ctx.fuel_checkpoint(n)?` and propagate a typed fuel
148 /// error without collapsing it into [`TaskletError::Cancelled`].
149 FuelExhausted,
150 /// Underlying transport / host error.
151 Fabric(FabricError),
152}
153
154impl From<FabricError> for TaskletError {
155 fn from(e: FabricError) -> Self {
156 TaskletError::Fabric(e)
157 }
158}
159
160impl From<FuelExhausted> for TaskletError {
161 fn from(_: FuelExhausted) -> Self {
162 TaskletError::FuelExhausted
163 }
164}
165
166impl From<Cancelled> for TaskletError {
167 fn from(_: Cancelled) -> Self {
168 TaskletError::Cancelled
169 }
170}
171
172/// Returned by [`WorkerCtx::barrier`] / [`CoordinatorCtx::barrier`] when
173/// the tasklet is cancelled while waiting on the barrier.
174///
175/// This mirrors [`super::guest::Cancelled`] on the wasm32 guest so that
176/// source code written as `w.barrier()?` compiles unchanged against
177/// either target. The host mock currently models cancellation with the
178/// same `Arc<AtomicBool>` that backs [`CoordinatorCtx::cancel`] and
179/// [`WorkerCtx::cancelled`]: after the underlying `std::sync::Barrier`
180/// releases, we re-check the flag and surface `Err(Cancelled)` if set.
181/// This is not a fully faithful simulation of a racing cancel mid-wait
182/// — the mock cannot unstick a `std::sync::Barrier` that never
183/// quorums — but it is sufficient for the "programmer writes
184/// `barrier()?` and the type checks" source-parity goal.
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
186pub struct Cancelled;
187
188/// Returned by [`WorkerCtx::fuel_checkpoint`] /
189/// [`CoordinatorCtx::fuel_checkpoint`] when the lease-wide shared fuel
190/// pool is exhausted (or, defensively, when called by a V1 worker that
191/// has no shared pool installed).
192///
193/// This mirrors [`super::guest::FuelExhausted`] on the wasm32 guest so
194/// source written as `ctx.fuel_checkpoint(n)?` compiles unchanged
195/// against either target. Phase 48.13 W2b: workers MUST stop computing
196/// immediately on `Err(FuelExhausted)` — there is no recovery path; a
197/// subsequent fuel-consuming instruction will trap fail-closed.
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub struct FuelExhausted;
200
201impl core::fmt::Display for FuelExhausted {
202 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
203 f.write_str("shared fuel pool exhausted")
204 }
205}
206
207// ---------------------------------------------------------------------------
208// Builder
209// ---------------------------------------------------------------------------
210
211/// Builder for a shared-memory tasklet.
212///
213/// Construct via [`FabricCpu::shared_memory_tasklet`]. The builder is
214/// generic over the programmer's shared-state type `S` and per-worker
215/// scratch type `W`. These are the types that the coordinator and workers
216/// see in their respective contexts.
217///
218/// The builder is intentionally distinct from [`crate::cpu::TaskletBuilder`]
219/// (the single-threaded path) so that "more cores" never silently means
220/// "more threads": shared-memory execution is an explicit, named mode.
221pub struct SharedTaskletBuilder<'a, S, W>
222where
223 S: Default + Send + Sync + 'static,
224 W: Default + Send + 'static,
225{
226 _cpu: &'a FabricCpu,
227 num_cores: Option<u16>,
228 max_threads: Option<u16>,
229 shared_bytes: Option<u32>,
230 scratch_bytes_per_worker: Option<u32>,
231 max_linear_memory: u32,
232 fuel: u64,
233 /// Phase 48.13 W2b — optional V2 shared-pool max fuel. When set, the
234 /// host mock constructs an `Arc<AtomicU64>` shared pool initialized
235 /// to this value, precharges each worker's slice the same way the
236 /// wasmtime runtime does, and installs the pool into every worker's
237 /// `WorkerSlot`. When `None`, no pool is installed and the
238 /// `fuel_checkpoint` SDK methods return `Err(FuelExhausted)` (V1
239 /// behavior).
240 max_fuel: Option<u64>,
241 input: Vec<u8>,
242 _phantom: PhantomData<(S, W)>,
243}
244
245impl<'a, S, W> SharedTaskletBuilder<'a, S, W>
246where
247 S: Default + Send + Sync + 'static,
248 W: Default + Send + 'static,
249{
250 fn new(cpu: &'a FabricCpu) -> Self {
251 Self {
252 _cpu: cpu,
253 num_cores: None,
254 max_threads: None,
255 shared_bytes: None,
256 scratch_bytes_per_worker: None,
257 max_linear_memory: u32::MAX,
258 fuel: 1_000_000,
259 max_fuel: None,
260 input: Vec::new(),
261 _phantom: PhantomData,
262 }
263 }
264
265 /// Number of CPU cores reserved for this tasklet (`num_cores`).
266 pub fn cores(mut self, n: u16) -> Self {
267 self.num_cores = Some(n);
268 self
269 }
270
271 /// Number of worker lanes the runtime will launch (`max_threads`).
272 /// Must satisfy `workers <= cores`.
273 pub fn workers(mut self, n: u16) -> Self {
274 self.max_threads = Some(n);
275 self
276 }
277
278 /// Size of the shared region in bytes.
279 pub fn shared_bytes(mut self, n: u32) -> Self {
280 self.shared_bytes = Some(n);
281 self
282 }
283
284 /// Size of each worker's private scratch region in bytes.
285 pub fn scratch_bytes_per_worker(mut self, n: u32) -> Self {
286 self.scratch_bytes_per_worker = Some(n);
287 self
288 }
289
290 /// Override the per-tasklet linear-memory budget. Defaults to `u32::MAX`.
291 /// `shared + workers * scratch_per_worker` must fit within this.
292 pub fn max_linear_memory(mut self, n: u32) -> Self {
293 self.max_linear_memory = n;
294 self
295 }
296
297 /// Total fuel budget shared across coordinator and all workers.
298 pub fn fuel(mut self, n: u64) -> Self {
299 self.fuel = n;
300 self
301 }
302
303 /// Phase 48.13 W2b — opt this builder into the V2 shared-pool fuel
304 /// model with `max_fuel` units in the lease-wide pool.
305 ///
306 /// When set, [`launch`](Self::launch) constructs an
307 /// `Arc<AtomicU64>` initialized to `max_fuel` and installs it into
308 /// each spawned worker's `WorkerSlot`, plus the coordinator's slot
309 /// during the launch lifetime. The
310 /// [`fuel_checkpoint`](WorkerCtx::fuel_checkpoint) SDK methods then
311 /// CAS-decrement the pool the same way the wasmtime runtime does.
312 ///
313 /// When unset (the default), no pool is installed and
314 /// `fuel_checkpoint` returns `Err(FuelExhausted)` — this preserves
315 /// the V1 behavior of all existing tests, which never call
316 /// `fuel_checkpoint` and so observe identical behavior.
317 pub fn with_max_fuel(mut self, max_fuel: u64) -> Self {
318 self.max_fuel = Some(max_fuel);
319 self
320 }
321
322 /// Input bytes the coordinator will read from shared memory.
323 pub fn input(mut self, bytes: Vec<u8>) -> Self {
324 self.input = bytes;
325 self
326 }
327
328 fn validate(&self) -> core::result::Result<SharedMemoryEnvelope, TaskletError> {
329 let cores = self.num_cores.ok_or(TaskletError::Missing("cores"))?;
330 let workers = self.max_threads.ok_or(TaskletError::Missing("workers"))?;
331 let shared = self
332 .shared_bytes
333 .ok_or(TaskletError::Missing("shared_bytes"))?;
334 let scratch = self
335 .scratch_bytes_per_worker
336 .ok_or(TaskletError::Missing("scratch_bytes_per_worker"))?;
337
338 if cores == 0 || workers == 0 || workers > cores {
339 return Err(TaskletError::InvalidWorkerCount);
340 }
341 // Phase 48.3 — shared-memory tasklets need a coordinator lane
342 // (worker 0) AND at least one data worker lane (worker 1..N).
343 // A single-lane shared-memory tasklet has no data workers and
344 // can't make progress; reject up front.
345 if workers < 2 {
346 return Err(TaskletError::InvalidWorkerCount);
347 }
348 // shared + workers * scratch with overflow check
349 let scratch_total = (workers as u64).checked_mul(scratch as u64);
350 let total = scratch_total.and_then(|s| s.checked_add(shared as u64));
351 let total = total.ok_or(TaskletError::MemoryEnvelopeInvalid)?;
352 if total > self.max_linear_memory as u64 {
353 return Err(TaskletError::MemoryEnvelopeInvalid);
354 }
355 Ok(SharedMemoryEnvelope {
356 workers,
357 shared_bytes: shared,
358 scratch_bytes_per_worker: scratch,
359 })
360 }
361
362 /// **Target-specific.** Launch the tasklet on the host mock. This
363 /// entry point is cfg-list: the wasm32 guest's equivalent is
364 /// [`super::guest::run_shared_memory_tasklet`], which has a different
365 /// shape (it is called BY the runtime, not BY the program, and it
366 /// takes both a coordinator closure and a worker closure because
367 /// every worker lane re-enters `tasklet_run`).
368 ///
369 /// Takes a single coordinator closure. The host mock drives data
370 /// workers from inside
371 /// [`CoordinatorCtx::parallel_for_workers`], so there is no separate
372 /// worker closure at launch time. Phase 48.14 removed the previous
373 /// vestigial `_worker` parameter — it was never called on host and
374 /// only masked the genuine cross-target shape difference.
375 ///
376 /// The `coordinator` closure signature
377 /// (`FnOnce(&mut CoordinatorCtx<S, W>) -> Result<(), TaskletError>`)
378 /// is source-portable with the coordinator closure passed to
379 /// `run_shared_memory_tasklet` on wasm32, so the closure *body* can
380 /// live in a shared module while the two outer entry points live
381 /// under `#[cfg(target_arch = "wasm32")]` gates. See the
382 /// "Cross-target source patterns" section of
383 /// `docs/grafos/shared-memory-tasklet-build-guide.md`.
384 pub fn launch<FCoord>(
385 self,
386 coordinator: FCoord,
387 ) -> core::result::Result<SharedTaskletResult, TaskletError>
388 where
389 FCoord: FnOnce(&mut CoordinatorCtx<S, W>) -> core::result::Result<(), TaskletError>,
390 {
391 let env = self.validate()?;
392
393 // Construct the shared state and per-worker scratch on the host.
394 // The runtime will eventually back these by linear memory; here we
395 // back them by host-side typed values so SDK tests can exercise the
396 // ctx APIs end to end.
397 let shared = Arc::new(S::default());
398 let mut scratches: Vec<W> = (0..env.workers).map(|_| W::default()).collect();
399 let cancelled = Arc::new(AtomicBool::new(false));
400 let input: Arc<Vec<u8>> = Arc::new(self.input);
401
402 // Phase 48.13 W2b — V2 shared-pool fuel construction. If the
403 // builder opted in via `with_max_fuel`, build the pool now and
404 // precharge each worker's slice the same way the wasmtime
405 // runtime does in `fabricbiosd`'s shared-memory dispatcher:
406 //
407 // slice = max(max_fuel / max_threads / 4, MIN_INITIAL_SLICE)
408 //
409 // The slice is debited from the pool atomically. The mock
410 // doesn't actually consume host-side native fuel — it only
411 // tracks the logical pool state — but the precharge math
412 // matches so SDK fidelity tests can compare against the runtime.
413 let shared_fuel_pool: Option<Arc<AtomicU64>> = self.max_fuel.map(|max_fuel| {
414 let pool = Arc::new(AtomicU64::new(max_fuel));
415 let workers = env.workers as u64;
416 let slice = core::cmp::max(max_fuel / workers / 4, MIN_INITIAL_SLICE);
417 // Precharge per worker. Saturating at the pool floor preserves
418 // the runtime invariant that we never debit below zero.
419 for _ in 0..workers {
420 let mut current = pool.load(Ordering::SeqCst);
421 loop {
422 let take = core::cmp::min(slice, current);
423 if take == 0 {
424 break;
425 }
426 match pool.compare_exchange_weak(
427 current,
428 current - take,
429 Ordering::SeqCst,
430 Ordering::SeqCst,
431 ) {
432 Ok(_) => break,
433 Err(observed) => current = observed,
434 }
435 }
436 }
437 pool
438 });
439
440 let mut coord = CoordinatorCtx {
441 shared: shared.clone(),
442 scratches: &mut scratches,
443 workers: env.workers,
444 shared_bytes: env.shared_bytes,
445 scratch_bytes_per_worker: env.scratch_bytes_per_worker,
446 cancelled: cancelled.clone(),
447 input,
448 output: Vec::new(),
449 shared_fuel_pool: shared_fuel_pool.clone(),
450 };
451
452 // Install a coordinator-thread WorkerSlot for the duration of
453 // the coordinator closure so `coord.fuel_checkpoint(n)` can
454 // observe the shared pool through the same `fb_fuel_checkpoint`
455 // shim that workers use. This mirrors wasm32 where worker 0 (the
456 // coordinator lane) has its own runtime slot.
457 let coord_slot = WorkerSlot {
458 worker_index: 0,
459 worker_count: env.workers as i32,
460 cancelled: cancelled.clone(),
461 barrier: None,
462 shared_ptr: 0,
463 shared_len: 0,
464 scratch_ptr: 0,
465 scratch_len: 0,
466 shared_fuel_pool: shared_fuel_pool.clone(),
467 };
468 let coord_result = with_worker_slot(coord_slot, || coordinator(&mut coord));
469 coord_result?;
470 if cancelled.load(Ordering::SeqCst) {
471 return Err(TaskletError::Cancelled);
472 }
473 let output = core::mem::take(&mut coord.output);
474 Ok(SharedTaskletResult {
475 exit_code: 0,
476 output,
477 })
478 }
479}
480
481#[derive(Clone, Copy)]
482struct SharedMemoryEnvelope {
483 workers: u16,
484 shared_bytes: u32,
485 scratch_bytes_per_worker: u32,
486}
487
488impl FabricCpu {
489 /// Begin building a shared-memory tasklet.
490 ///
491 /// This is a distinct entry point from [`FabricCpu::submit`]. Reserving
492 /// more CPU cores via [`crate::cpu::CpuBuilder::cores`] does not by
493 /// itself produce a multi-worker tasklet — only this builder does.
494 pub fn shared_memory_tasklet<S, W>(&self) -> SharedTaskletBuilder<'_, S, W>
495 where
496 S: Default + Send + Sync + 'static,
497 W: Default + Send + 'static,
498 {
499 SharedTaskletBuilder::new(self)
500 }
501}
502
503// ---------------------------------------------------------------------------
504// CoordinatorCtx
505// ---------------------------------------------------------------------------
506
507/// Programmer-facing coordinator context.
508///
509/// The coordinator is the only role that may perform authority-bearing
510/// hostcalls (FBMU, FBBU, leases, services, logging). Workers may compute
511/// on shared state and their own scratch, but must not call into those
512/// host modules.
513pub struct CoordinatorCtx<'a, S, W>
514where
515 S: Send + Sync + 'static,
516 W: Send + 'static,
517{
518 shared: Arc<S>,
519 scratches: &'a mut Vec<W>,
520 workers: u16,
521 shared_bytes: u32,
522 scratch_bytes_per_worker: u32,
523 cancelled: Arc<AtomicBool>,
524 input: Arc<Vec<u8>>,
525 /// Output bytes written by the coordinator via `set_output`. Phase
526 /// 48.3 — cross-target API parity with the wasm32 guest: programmer
527 /// writes `coord.set_output(&bytes); Ok(())` on both targets, and the
528 /// host mock's `launch` surfaces this vec to the caller.
529 output: Vec<u8>,
530 /// Phase 48.13 W2b — V2 lease-wide shared fuel pool. `None` for V1
531 /// (per-worker-slice) launches; `Some` when the builder opted in via
532 /// `with_max_fuel`. Cloned into each worker's WorkerSlot in
533 /// `parallel_for_workers`.
534 shared_fuel_pool: Option<Arc<AtomicU64>>,
535}
536
537impl<'a, S, W> CoordinatorCtx<'a, S, W>
538where
539 S: Send + Sync + 'static,
540 W: Send + 'static,
541{
542 /// Number of worker lanes that will run.
543 pub fn worker_count(&self) -> u16 {
544 self.workers
545 }
546
547 /// Whether the tasklet has been cancelled.
548 pub fn cancelled(&self) -> bool {
549 self.cancelled.load(Ordering::SeqCst)
550 }
551
552 /// **Target-specific.** Bytes available in the shared region (host
553 /// mock only; the wasm32 guest exposes length only through
554 /// [`SharedRegion::len_bytes`]).
555 pub fn shared_bytes(&self) -> u32 {
556 self.shared_bytes
557 }
558
559 /// **Target-specific.** Bytes available in each worker's scratch
560 /// region (host mock only).
561 pub fn scratch_bytes_per_worker(&self) -> u32 {
562 self.scratch_bytes_per_worker
563 }
564
565 /// Input bytes provided to the tasklet at submission time.
566 pub fn input(&self) -> &[u8] {
567 &self.input
568 }
569
570 /// Read access to shared state.
571 pub fn shared(&self) -> &S {
572 &self.shared
573 }
574
575 /// Read-only access to a specific worker's scratch slot.
576 ///
577 /// # Safety / discipline
578 ///
579 /// Only call this AFTER a barrier that the worker has reached. Reading
580 /// a worker's scratch before that worker has finished writing to it is
581 /// a race the SDK cannot detect. Typical pattern: workers write into
582 /// their scratch slots and then call [`WorkerCtx::barrier`]; the
583 /// coordinator calls its own barrier and only then reads
584 /// `coord.scratch(i)` for each worker.
585 ///
586 /// Worker scratch is NOT a general shared-memory channel between
587 /// workers. For ongoing communication during a phase, use the `Shared`
588 /// region with atomics. Worker scratch is for results that get
589 /// collected by the coordinator at well-defined phase boundaries.
590 ///
591 /// Takes `u32` to match the wasm32 guest signature so the same
592 /// source compiles unchanged against both targets.
593 pub fn scratch(&self, worker_index: u32) -> &W {
594 &self.scratches[worker_index as usize]
595 }
596
597 /// **Target-specific.** Mutable access to a single worker's scratch
598 /// (coordinator-only). The host mock takes a worker index because
599 /// the coordinator owns `&mut` references to every lane's scratch
600 /// slot; the wasm32 guest's `CoordinatorCtx::scratch_mut` takes no
601 /// arguments and returns worker 0's scratch slot only. Portable
602 /// source that needs cross-lane mutable scratch access must
603 /// `#[cfg]`-gate this call.
604 pub fn scratch_mut(&mut self, worker_index: u32) -> &mut W {
605 &mut self.scratches[worker_index as usize]
606 }
607
608 /// Number of data workers (excluding the coordinator lane).
609 ///
610 /// Equal to `worker_count() - 1`. Mirrors the wasm32 guest helper
611 /// of the same name; programmer code calling this compiles and runs
612 /// identically on both targets.
613 pub fn data_worker_count(&self) -> u32 {
614 (self.workers as u32) - 1
615 }
616
617 /// Read-only access to a specific data worker's scratch slot.
618 ///
619 /// `i` is the data worker index in `0..data_worker_count()`.
620 /// Internally this maps to wasm worker index `i + 1` (skipping the
621 /// coordinator's own scratch slot at wasm index 0, which is unused
622 /// under the coordinator-lane model).
623 ///
624 /// Same barrier discipline as [`CoordinatorCtx::scratch`]: only call
625 /// after a barrier the target data worker has reached.
626 pub fn data_scratch(&self, i: u32) -> &W {
627 self.scratch(i + 1)
628 }
629
630 /// **Target-specific.** Mark the tasklet as cancelled. Host mock
631 /// only — on wasm32 the runtime drives cancellation via
632 /// `fb_tasklet_cancelled()` and the guest has no `cancel()` lever.
633 /// Workers will observe this on their next
634 /// [`WorkerCtx::cancelled`] check or barrier wait.
635 pub fn cancel(&self) {
636 self.cancelled.store(true, Ordering::SeqCst);
637 }
638
639 /// **Behavior diverges across targets.** See the "Source portability"
640 /// section of `docs/grafos/shared-memory-tasklet-programming-model.md`.
641 ///
642 /// On the **host mock** this call:
643 ///
644 /// 1. Returns `Err(Cancelled)` if cancellation has already been
645 /// committed at the call site.
646 /// 2. Does NOT actually block on anything. The coordinator lane is
647 /// not a participant in the `std::sync::Barrier` used inside
648 /// [`CoordinatorCtx::parallel_for_workers`] (that barrier sizes
649 /// to the data-worker count only).
650 ///
651 /// On **wasm32** ([`super::guest::CoordinatorCtx::barrier`]) this
652 /// same call IS a real cross-lane barrier with the data workers,
653 /// because wasm32 worker 0 re-enters `tasklet_run` alongside
654 /// every other worker and all lanes rendezvous at
655 /// `fb_barrier_wait()`.
656 ///
657 /// The recommended portable pattern: use
658 /// [`CoordinatorCtx::parallel_for_workers`] as the phase boundary
659 /// on host (which already performs the join), and use
660 /// `coord.barrier()?` between phases on wasm32. A single source
661 /// body gated with `#[cfg(target_arch = "wasm32")]` is the honest
662 /// tool here — the SDK does not paper this over.
663 ///
664 /// Phase 48.14 closes Phase 48.3 follow-up #18 (cancellable-barrier
665 /// upgrade) as WONTFIX: the divergence is documented as an
666 /// intentional target-specific behavior, not slated for semantic
667 /// unification.
668 pub fn barrier(&self) -> core::result::Result<(), Cancelled> {
669 if self.cancelled.load(Ordering::SeqCst) {
670 Err(Cancelled)
671 } else {
672 Ok(())
673 }
674 }
675
676 /// Write the coordinator's output bytes. Mirrors the wasm32 guest
677 /// `CoordinatorCtx::set_output`: programmer calls this to emit the
678 /// tasklet output and then returns `Ok(())` from the coordinator
679 /// closure.
680 ///
681 /// On wasm32 the runtime copies `bytes` into the host-managed output
682 /// region and the coordinator's `tasklet_run` returns the number of
683 /// bytes written. Here on the host mock, `launch` collects the
684 /// recorded output and returns it inside [`SharedTaskletResult`].
685 pub fn set_output(&mut self, bytes: &[u8]) {
686 self.output.clear();
687 self.output.extend_from_slice(bytes);
688 }
689
690 /// Cooperatively reconcile fuel with the lease-wide shared pool.
691 ///
692 /// Returns `Ok(())` on a successful debit. Returns
693 /// `Err(FuelExhausted)` when the shared pool is exhausted, in which
694 /// case the coordinator MUST stop computing immediately — there is
695 /// no recovery path; a subsequent fuel-consuming instruction will
696 /// trap fail-closed.
697 ///
698 /// V2 (shared-pool) workers MUST call this at safe points within
699 /// any compute loop that may consume more fuel than the initial
700 /// precharge. On V1 (per-worker-slice) launches this method always
701 /// returns `Err(FuelExhausted)` because no shared pool is installed
702 /// — V1 source should not call it.
703 ///
704 /// `amount == 0` is a silent no-op returning `Ok(())`.
705 ///
706 /// Mirrors [`super::guest::CoordinatorCtx::fuel_checkpoint`] —
707 /// source compiled for either target observes identical
708 /// success/failure semantics.
709 pub fn fuel_checkpoint(&mut self, amount: u64) -> core::result::Result<(), FuelExhausted> {
710 if amount > i64::MAX as u64 {
711 return Err(FuelExhausted);
712 }
713 let result = mock::fb_fuel_checkpoint(amount as i64);
714 if result == 0 {
715 Ok(())
716 } else {
717 Err(FuelExhausted)
718 }
719 }
720
721 /// **Target-specific.** Run a worker phase: invokes `body` once per **data worker lane**
722 /// in parallel, each with its own [`WorkerCtx`]. Returns when every
723 /// data worker has completed (the implicit tasklet-wide barrier).
724 ///
725 /// Phase 48.3 — runs the body on data worker lanes
726 /// `1..worker_count()`. Worker 0 is the coordinator lane and does
727 /// not run the worker closure on either host mock or wasm32
728 /// targets — this matches the wasm32 SDK contract in
729 /// [`super::guest::run_shared_memory_tasklet`]. The number of body
730 /// invocations is therefore `data_worker_count() == worker_count() - 1`.
731 ///
732 /// `body` may compute on shared state (via interior mutability such as
733 /// atomics) and on its own scratch. It may NOT perform authority-bearing
734 /// hostcalls — those are coordinator-only by contract.
735 ///
736 /// This method is host-mock-only. The wasm32 guest drives workers by
737 /// having the runtime call `tasklet_run` once per lane; a program
738 /// writing source for both targets must `#[cfg]`-gate the outer
739 /// entry-point structure (see "Cross-target source patterns" in
740 /// the build guide). The worker closure *body* can still be shared
741 /// verbatim between the two entry points.
742 pub fn parallel_for_workers<F>(&mut self, body: F) -> core::result::Result<(), TaskletError>
743 where
744 F: Fn(&mut WorkerCtx<S, W>) + Sync + Send,
745 {
746 let workers = self.workers;
747 // Validated at builder time; defensive assert for the rare path
748 // where a caller constructed a CoordinatorCtx by hand.
749 assert!(
750 workers >= 2,
751 "shared-memory tasklet requires at least 2 worker lanes \
752 (1 coordinator + >=1 data worker)"
753 );
754 let shared = self.shared.clone();
755 let cancelled = self.cancelled.clone();
756 let shared_fuel_pool = self.shared_fuel_pool.clone();
757 let input_slice: &[u8] = &self.input;
758 // Barrier counts data workers only — the coordinator lane does
759 // not participate in the worker barrier.
760 let data_count = (workers - 1) as usize;
761 let barrier = Arc::new(std::sync::Barrier::new(data_count));
762
763 // Take exclusive references to each scratch slot. We split the Vec
764 // into per-element &mut and hand each lane its own. Worker 0's
765 // scratch slot is unused under the coordinator-lane model but we
766 // still hold its &mut so the index math lines up with wasm32.
767 let mut slots: Vec<&mut W> = self.scratches.iter_mut().collect();
768
769 std::thread::scope(|scope| {
770 let body = &body;
771 let mut handles = Vec::with_capacity(data_count);
772 let mut indexed: Vec<(u16, &mut W)> = slots
773 .drain(..)
774 .enumerate()
775 .map(|(i, s)| (i as u16, s))
776 .collect();
777
778 // Drop worker 0's slot — the coordinator lane does not run
779 // the body. Its scratch slot remains untouched in this
780 // phase, matching wasm32 where worker 0's scratch is
781 // accessible to the coordinator via `coord.scratch(0)` but
782 // never written by `parallel_for_workers`.
783 let _coord_slot = indexed.remove(0);
784
785 for _ in 1..workers {
786 let (idx, scratch_ref) = indexed.remove(0);
787 let shared = shared.clone();
788 let cancelled = cancelled.clone();
789 let barrier = barrier.clone();
790 let input = input_slice;
791 let shared_fuel_pool = shared_fuel_pool.clone();
792 handles.push(scope.spawn(move || {
793 let slot = WorkerSlot {
794 worker_index: idx as i32,
795 worker_count: workers as i32,
796 cancelled: cancelled.clone(),
797 barrier: Some(barrier.clone()),
798 shared_ptr: 0,
799 shared_len: 0,
800 scratch_ptr: 0,
801 scratch_len: 0,
802 shared_fuel_pool,
803 };
804 with_worker_slot(slot, || {
805 let mut wctx = WorkerCtx {
806 shared,
807 scratch: scratch_ref,
808 worker_index: idx,
809 worker_count: workers,
810 cancelled,
811 barrier,
812 input,
813 };
814 body(&mut wctx);
815 });
816 }));
817 }
818 for h in handles {
819 let _ = h.join();
820 }
821 });
822
823 if self.cancelled.load(Ordering::SeqCst) {
824 return Err(TaskletError::Cancelled);
825 }
826 Ok(())
827 }
828}
829
830// ---------------------------------------------------------------------------
831// WorkerCtx
832// ---------------------------------------------------------------------------
833
834/// Programmer-facing worker context.
835///
836/// Workers may read shared state, mutate their own scratch, observe
837/// cancellation, and synchronize at the implicit tasklet-wide barrier.
838/// Workers must not perform authority-bearing hostcalls.
839pub struct WorkerCtx<'a, S, W>
840where
841 S: Send + Sync + 'static,
842 W: Send + 'static,
843{
844 shared: Arc<S>,
845 scratch: &'a mut W,
846 worker_index: u16,
847 worker_count: u16,
848 cancelled: Arc<AtomicBool>,
849 barrier: Arc<std::sync::Barrier>,
850 input: &'a [u8],
851}
852
853impl<'a, S, W> WorkerCtx<'a, S, W>
854where
855 S: Send + Sync + 'static,
856 W: Send + 'static,
857{
858 /// This worker's lane index in `[0, worker_count())`.
859 pub fn worker_index(&self) -> u16 {
860 self.worker_index
861 }
862
863 /// Total worker count for this tasklet.
864 pub fn worker_count(&self) -> u16 {
865 self.worker_count
866 }
867
868 /// Index of this worker among data workers (excluding the
869 /// coordinator lane).
870 ///
871 /// Data workers are wasm worker indices `1..worker_count()`,
872 /// exposed here as `0..data_worker_count()`. Use this for
873 /// partitioning input across data workers via `partition::range` or
874 /// `partition::tiles`. Mirrors the wasm32 guest helper.
875 ///
876 /// Under the Phase 48.3 coordinator-lane programming model, worker
877 /// 0 only runs the coordinator closure, so this method is only ever
878 /// called from a worker closure invocation where `worker_index() >=
879 /// 1`, and the subtraction is structurally safe.
880 pub fn data_worker_index(&self) -> u32 {
881 (self.worker_index as u32) - 1
882 }
883
884 /// Number of data workers (worker lanes excluding the coordinator
885 /// lane).
886 ///
887 /// Equal to `worker_count() - 1`. Always at least 1 for a valid
888 /// shared-memory tasklet (the SDK builder requires `workers >= 2`).
889 pub fn data_worker_count(&self) -> u32 {
890 (self.worker_count as u32) - 1
891 }
892
893 /// **Behavior diverges across targets.** See the "Source portability"
894 /// section of `docs/grafos/shared-memory-tasklet-programming-model.md`.
895 ///
896 /// Wait at the implicit tasklet-wide barrier. Returns
897 /// [`Err(Cancelled)`] if the tasklet was cancelled by the time this
898 /// worker releases from the barrier.
899 ///
900 /// On the **host mock** this IS a real barrier among the data
901 /// workers: the underlying `std::sync::Barrier` is sized to
902 /// `data_worker_count()` (not `worker_count()`), and the
903 /// coordinator lane is not a participant. That means host-side
904 /// `w.barrier()` rendezvous with other data workers, but NOT with
905 /// the coordinator.
906 ///
907 /// On **wasm32** ([`super::guest::WorkerCtx::barrier`]) this
908 /// rendezvous with the coordinator lane as well, because worker 0
909 /// re-enters `tasklet_run` and participates in `fb_barrier_wait()`.
910 ///
911 /// The asymmetry in a single sentence: on host mock, `coord.barrier()`
912 /// is a no-op-with-cancel-check while `w.barrier()` is a real
913 /// data-worker rendezvous. On wasm32 both are real full-lane
914 /// barriers. Source that relies on cross-lane coordinator/worker
915 /// rendezvous must `#[cfg]`-gate this call.
916 pub fn barrier(&self) -> core::result::Result<(), Cancelled> {
917 self.barrier.wait();
918 if self.cancelled.load(Ordering::SeqCst) {
919 Err(Cancelled)
920 } else {
921 Ok(())
922 }
923 }
924
925 /// Whether the tasklet has been cancelled.
926 pub fn cancelled(&self) -> bool {
927 self.cancelled.load(Ordering::SeqCst)
928 }
929
930 /// Read-only access to shared state. For shared mutation, the program
931 /// uses interior mutability inside `S` (e.g. atomics).
932 pub fn shared(&self) -> &S {
933 &self.shared
934 }
935
936 /// Input bytes provided to the tasklet at submission time.
937 ///
938 /// Mirrors [`CoordinatorCtx::input`]: every worker sees the same
939 /// bytes read-only. Workers must not mutate these bytes; for
940 /// cross-worker communication use the `Shared` region with atomics.
941 ///
942 /// The returned slice is tied to the context's lifetime rather
943 /// than to `&self`, so callers can hold it across a
944 /// `scratch_mut()` borrow.
945 pub fn input(&self) -> &'a [u8] {
946 self.input
947 }
948
949 /// Mutable access to *this* worker's private scratch. The type system
950 /// guarantees that no other worker can touch this slot.
951 pub fn scratch_mut(&mut self) -> &mut W {
952 self.scratch
953 }
954
955 /// **Target-specific.** Read-only access to *this* worker's scratch.
956 /// Host mock only — the wasm32 guest's `WorkerCtx` exposes
957 /// `scratch_mut(&mut self)` only and has no read-only accessor.
958 pub fn scratch(&self) -> &W {
959 self.scratch
960 }
961
962 /// Cooperatively reconcile fuel with the lease-wide shared pool.
963 ///
964 /// Returns `Ok(())` on a successful debit. Returns
965 /// `Err(FuelExhausted)` when the shared pool is exhausted, in which
966 /// case the worker MUST stop computing immediately — there is no
967 /// recovery path; a subsequent fuel-consuming instruction will trap
968 /// fail-closed.
969 ///
970 /// V2 (shared-pool) workers MUST call this at safe points within
971 /// any compute loop that may consume more fuel than the initial
972 /// precharge. On V1 (per-worker-slice) launches this method always
973 /// returns `Err(FuelExhausted)` because no shared pool is installed
974 /// — V1 source should not call it.
975 ///
976 /// `amount == 0` is a silent no-op returning `Ok(())`.
977 ///
978 /// Mirrors [`super::guest::WorkerCtx::fuel_checkpoint`].
979 pub fn fuel_checkpoint(&mut self, amount: u64) -> core::result::Result<(), FuelExhausted> {
980 if amount > i64::MAX as u64 {
981 return Err(FuelExhausted);
982 }
983 let result = mock::fb_fuel_checkpoint(amount as i64);
984 if result == 0 {
985 Ok(())
986 } else {
987 Err(FuelExhausted)
988 }
989 }
990}
991
992// ---------------------------------------------------------------------------
993// SharedRegion / WorkerScratch — typed views over raw bytes.
994//
995// These exist for programs that want raw byte-addressed access to the
996// shared / scratch regions instead of a typed `S` / `W`. The runtime will
997// eventually back these with linear-memory slices; in the mock SDK they
998// are zero-length placeholders.
999// ---------------------------------------------------------------------------
1000
1001/// Typed wrapper over the shared region of a tasklet.
1002pub struct SharedRegion<T> {
1003 _phantom: PhantomData<T>,
1004 len: u32,
1005}
1006
1007impl<T> SharedRegion<T> {
1008 /// Returns the length of the shared region in bytes.
1009 pub fn len_bytes(&self) -> u32 {
1010 self.len
1011 }
1012}
1013
1014/// Typed wrapper over a single worker's scratch region.
1015///
1016/// The lifetime of `WorkerScratch` is tied to the worker lane that owns it,
1017/// so it cannot alias another worker's scratch.
1018pub struct WorkerScratch<T> {
1019 _phantom: PhantomData<T>,
1020 len: u32,
1021}
1022
1023impl<T> WorkerScratch<T> {
1024 /// Returns the length of the scratch region in bytes.
1025 pub fn len_bytes(&self) -> u32 {
1026 self.len
1027 }
1028}
1029
1030// ---------------------------------------------------------------------------
1031// Tests
1032// ---------------------------------------------------------------------------
1033
1034#[cfg(test)]
1035mod tests {
1036 use super::*;
1037 use crate::cpu::CpuBuilder;
1038 use crate::cpu_shared::partition::{range, tiles, Tile};
1039 use crate::host;
1040 use std::collections::HashSet;
1041 use std::sync::atomic::AtomicU32;
1042
1043 #[derive(Default)]
1044 struct TestShared {
1045 counter: AtomicU32,
1046 }
1047
1048 #[derive(Default, Clone)]
1049 struct TestScratch {
1050 local: u32,
1051 }
1052
1053 fn cpu() -> crate::cpu::CpuLease {
1054 host::reset_mock();
1055 CpuBuilder::new().cores(4).acquire().expect("acquire")
1056 }
1057
1058 #[test]
1059 fn builder_fluent_api_compiles() {
1060 let lease = cpu();
1061 let result = lease
1062 .cpu()
1063 .shared_memory_tasklet::<TestShared, TestScratch>()
1064 .cores(4)
1065 .workers(4)
1066 .shared_bytes(1024)
1067 .scratch_bytes_per_worker(64)
1068 .fuel(1_000_000)
1069 .input(b"hi".to_vec())
1070 .launch(|coord| {
1071 assert_eq!(coord.worker_count(), 4);
1072 assert_eq!(coord.input(), b"hi");
1073 coord.set_output(b"ok");
1074 Ok(())
1075 })
1076 .expect("launch");
1077 assert_eq!(result.exit_code, 0);
1078 assert_eq!(&result.output, b"ok");
1079 }
1080
1081 #[test]
1082 fn rejects_workers_gt_cores() {
1083 let lease = cpu();
1084 let err = lease
1085 .cpu()
1086 .shared_memory_tasklet::<TestShared, TestScratch>()
1087 .cores(2)
1088 .workers(4)
1089 .shared_bytes(0)
1090 .scratch_bytes_per_worker(0)
1091 .launch(|_c| Ok(()))
1092 .unwrap_err();
1093 assert_eq!(err, TaskletError::InvalidWorkerCount);
1094 }
1095
1096 #[test]
1097 fn rejects_oversize_envelope() {
1098 let lease = cpu();
1099 let err = lease
1100 .cpu()
1101 .shared_memory_tasklet::<TestShared, TestScratch>()
1102 .cores(4)
1103 .workers(4)
1104 .shared_bytes(100)
1105 .scratch_bytes_per_worker(50)
1106 .max_linear_memory(150) // 100 + 4*50 = 300 > 150
1107 .launch(|_c| Ok(()))
1108 .unwrap_err();
1109 assert_eq!(err, TaskletError::MemoryEnvelopeInvalid);
1110 }
1111
1112 #[test]
1113 fn partition_range_divides_evenly() {
1114 assert_eq!(range(100, 0, 4), 0..25);
1115 assert_eq!(range(100, 1, 4), 25..50);
1116 assert_eq!(range(100, 2, 4), 50..75);
1117 assert_eq!(range(100, 3, 4), 75..100);
1118 }
1119
1120 #[test]
1121 fn partition_range_handles_remainder() {
1122 let r0 = range(10, 0, 3);
1123 let r1 = range(10, 1, 3);
1124 let r2 = range(10, 2, 3);
1125 // Full coverage, no overlap.
1126 assert_eq!(r0.start, 0);
1127 assert_eq!(r0.end, r1.start);
1128 assert_eq!(r1.end, r2.start);
1129 assert_eq!(r2.end, 10);
1130 // First (10 % 3) = 1 worker gets one extra.
1131 assert_eq!(r0.len(), 4);
1132 assert_eq!(r1.len(), 3);
1133 assert_eq!(r2.len(), 3);
1134 }
1135
1136 #[test]
1137 fn partition_tiles_covers_full_image() {
1138 let mut covered: HashSet<(u32, u32)> = HashSet::new();
1139 let mut total_pixels = 0usize;
1140 for wi in 0..4u16 {
1141 for Tile { x, y, w, h } in tiles(64, 64, 16, 16, wi, 4) {
1142 for dy in 0..h {
1143 for dx in 0..w {
1144 let inserted = covered.insert((x + dx, y + dy));
1145 assert!(inserted, "pixel covered twice");
1146 total_pixels += 1;
1147 }
1148 }
1149 }
1150 }
1151 assert_eq!(total_pixels, 64 * 64);
1152 assert_eq!(covered.len(), 64 * 64);
1153 }
1154
1155 #[test]
1156 fn coordinator_ctx_exposes_worker_count() {
1157 let lease = cpu();
1158 lease
1159 .cpu()
1160 .shared_memory_tasklet::<TestShared, TestScratch>()
1161 .cores(8)
1162 .workers(8)
1163 .shared_bytes(0)
1164 .scratch_bytes_per_worker(0)
1165 .launch(|coord| {
1166 assert_eq!(coord.worker_count(), 8);
1167 Ok(())
1168 })
1169 .unwrap();
1170 }
1171
1172 #[test]
1173 fn worker_ctx_exposes_worker_index() {
1174 // Phase 48.3 — under the coordinator-lane model,
1175 // parallel_for_workers runs the body on data worker lanes
1176 // 1..worker_count(). With workers=4, the body sees raw wasm
1177 // worker indices [1, 2, 3] (NOT [0, 1, 2, 3]).
1178 let lease = cpu();
1179 let seen = Arc::new(std::sync::Mutex::new(Vec::<u16>::new()));
1180 let seen_data = Arc::new(std::sync::Mutex::new(Vec::<u32>::new()));
1181 let seen_clone = seen.clone();
1182 let seen_data_clone = seen_data.clone();
1183 lease
1184 .cpu()
1185 .shared_memory_tasklet::<TestShared, TestScratch>()
1186 .cores(4)
1187 .workers(4)
1188 .shared_bytes(0)
1189 .scratch_bytes_per_worker(0)
1190 .launch(move |coord| {
1191 assert_eq!(coord.data_worker_count(), 3);
1192 let s = seen_clone.clone();
1193 let sd = seen_data_clone.clone();
1194 coord
1195 .parallel_for_workers(move |w| {
1196 assert_eq!(w.worker_count(), 4);
1197 assert_eq!(w.data_worker_count(), 3);
1198 s.lock().unwrap().push(w.worker_index());
1199 sd.lock().unwrap().push(w.data_worker_index());
1200 })
1201 .unwrap();
1202 Ok(())
1203 })
1204 .unwrap();
1205 let mut got = seen.lock().unwrap().clone();
1206 got.sort();
1207 assert_eq!(got, vec![1, 2, 3]);
1208 let mut got_data = seen_data.lock().unwrap().clone();
1209 got_data.sort();
1210 assert_eq!(got_data, vec![0, 1, 2]);
1211 }
1212
1213 #[test]
1214 fn shared_region_visible_to_all_workers() {
1215 let lease = cpu();
1216 let result = lease
1217 .cpu()
1218 .shared_memory_tasklet::<TestShared, TestScratch>()
1219 .cores(4)
1220 .workers(4)
1221 .shared_bytes(1024)
1222 .scratch_bytes_per_worker(64)
1223 .launch(|coord| {
1224 coord
1225 .parallel_for_workers(|w| {
1226 w.shared().counter.fetch_add(1, Ordering::SeqCst);
1227 w.scratch_mut().local = w.worker_index() as u32;
1228 })
1229 .unwrap();
1230 let n = coord.shared().counter.load(Ordering::SeqCst);
1231 coord.set_output(&n.to_le_bytes());
1232 Ok(())
1233 })
1234 .unwrap();
1235 // Phase 48.3 — coordinator lane (worker 0) does NOT run the
1236 // worker closure, so only `data_worker_count() == 3` lanes
1237 // increment the counter when launched with workers=4.
1238 assert_eq!(u32::from_le_bytes(result.output.try_into().unwrap()), 3);
1239 }
1240
1241 #[test]
1242 fn worker_ctx_sees_input_bytes() {
1243 let lease = cpu();
1244 let seen = Arc::new(std::sync::Mutex::new(Vec::<Vec<u8>>::new()));
1245 let seen_clone = seen.clone();
1246 lease
1247 .cpu()
1248 .shared_memory_tasklet::<TestShared, TestScratch>()
1249 .cores(4)
1250 .workers(4)
1251 .shared_bytes(0)
1252 .scratch_bytes_per_worker(0)
1253 .input(b"hello workers".to_vec())
1254 .launch(move |coord| {
1255 let s = seen_clone.clone();
1256 assert_eq!(coord.input(), b"hello workers");
1257 coord
1258 .parallel_for_workers(move |w| {
1259 assert_eq!(w.input(), b"hello workers");
1260 s.lock().unwrap().push(w.input().to_vec());
1261 })
1262 .unwrap();
1263 Ok(())
1264 })
1265 .unwrap();
1266 // Phase 48.3 — only the 3 data worker lanes (workers 1..4)
1267 // run the body; worker 0 is the coordinator lane.
1268 let got = seen.lock().unwrap().clone();
1269 assert_eq!(got.len(), 3);
1270 for v in got {
1271 assert_eq!(v, b"hello workers");
1272 }
1273 }
1274
1275 #[test]
1276 fn coordinator_reads_cross_worker_scratch_after_barrier() {
1277 let lease = cpu();
1278 lease
1279 .cpu()
1280 .shared_memory_tasklet::<TestShared, TestScratch>()
1281 .cores(4)
1282 .workers(4)
1283 .shared_bytes(0)
1284 .scratch_bytes_per_worker(64)
1285 .launch(|coord| {
1286 // Each data worker writes its (index*10) into its
1287 // scratch. parallel_for_workers joins all data
1288 // workers before returning, which acts as the
1289 // barrier.
1290 coord
1291 .parallel_for_workers(|w| {
1292 w.scratch_mut().local = (w.worker_index() as u32) * 10;
1293 })
1294 .unwrap();
1295 // Now read every worker's scratch from the
1296 // coordinator. Worker 0 (coordinator lane) was not
1297 // touched, so its slot stays at the default 0;
1298 // workers 1, 2, 3 wrote 10, 20, 30. Sum = 60.
1299 //
1300 // Coincidence note: the post-Phase-48.3 sum
1301 // (0 + 10 + 20 + 30) happens to equal the
1302 // pre-Phase-48.3 sum (0 + 10 + 20 + 30 — same set
1303 // because worker 0 always wrote 0 anyway). If this
1304 // assertion's magic numbers are ever changed, the
1305 // expected value MUST be re-derived against the
1306 // current "data workers only" model — do NOT
1307 // assume the old "all lanes participate" math.
1308 let mut sum: u32 = 0;
1309 for i in 0..coord.worker_count() {
1310 sum += coord.scratch(u32::from(i)).local;
1311 }
1312 coord.set_output(&sum.to_le_bytes());
1313 Ok(())
1314 })
1315 .map(|r| {
1316 assert_eq!(u32::from_le_bytes(r.output.try_into().unwrap()), 60);
1317 })
1318 .unwrap();
1319 }
1320
1321 #[test]
1322 fn rejects_single_lane_shared_memory_tasklet() {
1323 // Phase 48.3 — a 1-lane shared-memory tasklet has a coordinator
1324 // but no data workers and cannot make progress; the builder
1325 // must reject up front.
1326 let lease = cpu();
1327 let err = lease
1328 .cpu()
1329 .shared_memory_tasklet::<TestShared, TestScratch>()
1330 .cores(1)
1331 .workers(1)
1332 .shared_bytes(0)
1333 .scratch_bytes_per_worker(0)
1334 .launch(|_c| Ok(()))
1335 .unwrap_err();
1336 assert_eq!(err, TaskletError::InvalidWorkerCount);
1337 }
1338
1339 #[test]
1340 fn fuel_checkpoint_succeeds_until_pool_exhausted() {
1341 // Phase 48.13 W2b — V2 shared-pool launch with a small max_fuel.
1342 // The coordinator calls fuel_checkpoint repeatedly until the
1343 // pool exhausts, and the test verifies the deterministic
1344 // transition from Ok(()) to Err(FuelExhausted).
1345 let lease = cpu();
1346 let result = lease
1347 .cpu()
1348 .shared_memory_tasklet::<TestShared, TestScratch>()
1349 .cores(2)
1350 .workers(2)
1351 .shared_bytes(0)
1352 .scratch_bytes_per_worker(0)
1353 // workers=2 → precharge math: slice = max(20_000/2/4, 4096)
1354 // = max(2500, 4096) = 4096; precharge consumes 2*4096 = 8192
1355 // out of 20_000, leaving 11_808 in the pool for checkpoints.
1356 .with_max_fuel(20_000)
1357 .launch(|coord| {
1358 // Drain the pool in 1024-unit ticks. We don't know
1359 // the exact starting balance (it depends on the
1360 // precharge math), but we know it's bounded above
1361 // by max_fuel and that successive checkpoints must
1362 // eventually return FuelExhausted.
1363 let mut succeeded: u32 = 0;
1364 let mut hit_exhaustion = false;
1365 for _ in 0..1000 {
1366 match coord.fuel_checkpoint(1024) {
1367 Ok(()) => succeeded += 1,
1368 Err(FuelExhausted) => {
1369 hit_exhaustion = true;
1370 break;
1371 }
1372 }
1373 }
1374 assert!(hit_exhaustion, "pool must eventually exhaust");
1375 // After a failed 1024-tick the pool may still hold
1376 // 0..1023 units. Drain it byte-by-byte until even a
1377 // single-unit checkpoint fails, then assert any
1378 // further debit fails.
1379 for _ in 0..1024 {
1380 if coord.fuel_checkpoint(1).is_err() {
1381 break;
1382 }
1383 }
1384 assert_eq!(coord.fuel_checkpoint(1), Err(FuelExhausted));
1385 assert_eq!(coord.fuel_checkpoint(1024), Err(FuelExhausted));
1386 // amount == 0 is a silent no-op returning Ok(()) even
1387 // post-exhaustion.
1388 assert_eq!(coord.fuel_checkpoint(0), Ok(()));
1389 coord.set_output(&succeeded.to_le_bytes());
1390 Ok(())
1391 })
1392 .expect("launch");
1393 let succeeded = u32::from_le_bytes(result.output.try_into().unwrap());
1394 // Sanity: at least one checkpoint must have succeeded (the pool
1395 // is large enough to accommodate several 1024-unit debits).
1396 assert!(succeeded >= 1, "at least one checkpoint must succeed");
1397 }
1398
1399 #[test]
1400 fn fuel_checkpoint_returns_err_when_no_pool_installed() {
1401 // Phase 48.13 W2b — a launch that does NOT call with_max_fuel
1402 // simulates a V1 worker. fuel_checkpoint returns
1403 // Err(FuelExhausted) on every call (other than amount==0).
1404 let lease = cpu();
1405 lease
1406 .cpu()
1407 .shared_memory_tasklet::<TestShared, TestScratch>()
1408 .cores(2)
1409 .workers(2)
1410 .shared_bytes(0)
1411 .scratch_bytes_per_worker(0)
1412 // No with_max_fuel — V1 simulation.
1413 .launch(|coord| {
1414 assert_eq!(coord.fuel_checkpoint(1), Err(FuelExhausted));
1415 assert_eq!(coord.fuel_checkpoint(1024), Err(FuelExhausted));
1416 // amount == 0 is still a no-op.
1417 assert_eq!(coord.fuel_checkpoint(0), Ok(()));
1418 coord
1419 .parallel_for_workers(|w| {
1420 assert_eq!(w.fuel_checkpoint(1), Err(FuelExhausted));
1421 assert_eq!(w.fuel_checkpoint(0), Ok(()));
1422 })
1423 .unwrap();
1424 Ok(())
1425 })
1426 .expect("launch");
1427 }
1428
1429 #[test]
1430 fn cancel_propagates_to_workers() {
1431 let lease = cpu();
1432 let result = lease
1433 .cpu()
1434 .shared_memory_tasklet::<TestShared, TestScratch>()
1435 .cores(2)
1436 .workers(2)
1437 .shared_bytes(0)
1438 .scratch_bytes_per_worker(0)
1439 .launch(|coord| {
1440 coord.cancel();
1441 assert!(coord.cancelled());
1442 Ok(())
1443 });
1444 assert_eq!(result.unwrap_err(), TaskletError::Cancelled);
1445 }
1446}