grafos_std/cpu_shared/
host_mock.rs

1//! Shared-memory tasklet SDK surface — host mock (Phase 48.3 P3, Phase 48.14).
2//!
3//! This module exposes the *programmer-facing* types for the shared-memory
4//! tasklet execution mode: a builder, a coordinator context, a worker
5//! context, typed views over the shared region and per-worker scratch, and
6//! partition helpers.
7//!
8//! # Source portability (Phase 48.14)
9//!
10//! The host mock and the wasm32 guest ([`super::guest`]) deliberately do
11//! NOT present a single source-portable surface. They present **three
12//! levels** of parity, documented in detail in
13//! [`docs/grafos/shared-memory-tasklet-programming-model.md`][portability]:
14//!
15//! 1. **Behavioral portability** is the primary promise. The fidelity
16//!    test `shared_memory_host_wasm_fidelity_e2e` guarantees that the
17//!    same logical workload produces byte-identical output on both
18//!    targets.
19//! 2. **Source portability** is a narrower documented subset. Methods on
20//!    the safe-list compile and behave identically on both targets. The
21//!    cfg-list and behavior-divergent sub-list do NOT.
22//! 3. **`#[cfg]` is the honest tool outside that subset.** Methods
23//!    tagged with `**Target-specific.**` below require per-target source
24//!    when used portably.
25//!
26//! Methods carrying the `**Target-specific.**` rustdoc tag are on the
27//! cfg-list. Methods carrying `**Behavior diverges across targets.**`
28//! compile on both targets but mean different things (see
29//! [`CoordinatorCtx::barrier`] and [`WorkerCtx::barrier`]).
30//!
31//! [portability]: https://example.invalid
32//! (See `docs/grafos/shared-memory-tasklet-programming-model.md#source-portability-between-host-mock-and-wasm32-guest`)
33//!
34//! ## Coordinator-lane programming model
35//!
36//! Phase 48.3 — the host mock implements the same coordinator-lane model
37//! as the wasm32 guest path in [`super::guest`]: worker 0 is the
38//! coordinator lane and runs only the coordinator closure;
39//! [`CoordinatorCtx::parallel_for_workers`] spawns the worker body on
40//! data worker lanes `1..worker_count()` (skipping worker 0). Source
41//! compiled for either target produces identical observable behavior
42//! under the `SharedMemory` execution mode, so a programmer can
43//! prototype against the host mock and deploy unchanged on a real
44//! fabricBIOS runtime. The `data_worker_index() / data_worker_count() /
45//! data_scratch(i)` helpers expose the data-plane lane space for
46//! partitioning, while the raw `worker_index() / worker_count() /
47//! scratch(i)` accessors keep "raw wasm worker space" semantics for
48//! advanced cases.
49//!
50//! ## What this is
51//!
52//! - The types developers will use to write a shared-memory tasklet.
53//! - A host-mock launch path so the SDK is unit-testable without a runtime.
54//! - The companion ABI declaration lives in [`crate::grafos_worker_v0`].
55//!
56//! ## What this is NOT
57//!
58//! - This is not the runtime. The wasmtime/wasmi-backed runtime that
59//!   actually executes shared-memory tasklets is delivered as part of the
60//!   P1 track. Here, `launch()` runs the coordinator and worker closures
61//!   directly on the host using a `std::thread::scope` plus a real
62//!   `std::sync::Barrier`, purely so the SDK abstractions can be exercised
63//!   in unit tests.
64//! - The programmer-facing API never mentions wasmi, wasmtime, or "threads".
65//!   The abstraction is *coordinator + workers* with explicit shared and
66//!   per-worker state.
67//!
68//! ## Programming model
69//!
70//! ```no_run
71//! use grafos_std::cpu::{CpuBuilder, CoordinatorCtx, WorkerCtx};
72//! use std::sync::atomic::{AtomicU32, Ordering};
73//!
74//! #[derive(Default)]
75//! struct Shared {
76//!     counter: AtomicU32,
77//! }
78//! #[derive(Default, Clone)]
79//! struct Scratch {
80//!     local: u32,
81//! }
82//!
83//! # grafos_std::host::reset_mock();
84//! let lease = CpuBuilder::new().cores(4).acquire().unwrap();
85//! let _ = lease.cpu().shared_memory_tasklet::<Shared, Scratch>()
86//!     .cores(4)
87//!     .workers(4)
88//!     .shared_bytes(1024)
89//!     .scratch_bytes_per_worker(64)
90//!     .fuel(1_000_000)
91//!     .launch(|coord: &mut CoordinatorCtx<Shared, Scratch>| {
92//!         // Coordinator: load input, orchestrate phases, emit output.
93//!         coord.parallel_for_workers(|w| {
94//!             w.scratch_mut().local = w.worker_index() as u32;
95//!             w.shared().counter.fetch_add(1, Ordering::SeqCst);
96//!         }).unwrap();
97//!         let n = coord.shared().counter.load(Ordering::SeqCst);
98//!         coord.set_output(&n.to_le_bytes());
99//!         Ok(())
100//!     });
101//! ```
102
103extern crate alloc;
104use alloc::sync::Arc;
105use alloc::vec::Vec;
106use core::marker::PhantomData;
107use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
108
109use crate::cpu::FabricCpu;
110use crate::error::FabricError;
111use crate::grafos_worker_v0::mock::{self, with_worker_slot, WorkerSlot};
112
113/// Phase 48.13 W2b — minimum precharge slice per worker. Mirrors the
114/// wasmtime runtime constant in the shared-memory dispatcher so the host
115/// mock and the real runtime agree on the precharge math.
116const MIN_INITIAL_SLICE: u64 = 4096;
117
118/// Result of a completed shared-memory tasklet.
119#[derive(Debug, Clone)]
120pub struct SharedTaskletResult {
121    /// Exit code (0 = success).
122    pub exit_code: u64,
123    /// Output bytes emitted by the coordinator.
124    pub output: Vec<u8>,
125}
126
127/// Errors produced by the shared-memory tasklet SDK surface.
128///
129/// These are SDK-side validation errors, distinct from the underlying
130/// transport errors in [`FabricError`]. A successful submission may still
131/// surface a [`FabricError`] from the runtime.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub enum TaskletError {
134    /// `max_threads > num_cores`, or either is zero.
135    InvalidWorkerCount,
136    /// Memory envelope `shared + max_threads * scratch_per_worker` overflowed
137    /// or exceeded the configured tasklet linear-memory budget.
138    MemoryEnvelopeInvalid,
139    /// A required builder field was not set.
140    Missing(&'static str),
141    /// The whole tasklet was cancelled (revoke / expiry / explicit cancel).
142    Cancelled,
143    /// The lease-wide shared fuel pool is exhausted (Phase 48.14 — folds
144    /// Phase 48.3 follow-up #35). Produced by `?`-propagation from
145    /// [`CoordinatorCtx::fuel_checkpoint`] / [`WorkerCtx::fuel_checkpoint`]
146    /// via the [`From<FuelExhausted>`] impl below, so programmer source
147    /// can write `ctx.fuel_checkpoint(n)?` and propagate a typed fuel
148    /// error without collapsing it into [`TaskletError::Cancelled`].
149    FuelExhausted,
150    /// Underlying transport / host error.
151    Fabric(FabricError),
152}
153
154impl From<FabricError> for TaskletError {
155    fn from(e: FabricError) -> Self {
156        TaskletError::Fabric(e)
157    }
158}
159
160impl From<FuelExhausted> for TaskletError {
161    fn from(_: FuelExhausted) -> Self {
162        TaskletError::FuelExhausted
163    }
164}
165
166impl From<Cancelled> for TaskletError {
167    fn from(_: Cancelled) -> Self {
168        TaskletError::Cancelled
169    }
170}
171
172/// Returned by [`WorkerCtx::barrier`] / [`CoordinatorCtx::barrier`] when
173/// the tasklet is cancelled while waiting on the barrier.
174///
175/// This mirrors [`super::guest::Cancelled`] on the wasm32 guest so that
176/// source code written as `w.barrier()?` compiles unchanged against
177/// either target. The host mock currently models cancellation with the
178/// same `Arc<AtomicBool>` that backs [`CoordinatorCtx::cancel`] and
179/// [`WorkerCtx::cancelled`]: after the underlying `std::sync::Barrier`
180/// releases, we re-check the flag and surface `Err(Cancelled)` if set.
181/// This is not a fully faithful simulation of a racing cancel mid-wait
182/// — the mock cannot unstick a `std::sync::Barrier` that never
183/// quorums — but it is sufficient for the "programmer writes
184/// `barrier()?` and the type checks" source-parity goal.
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
186pub struct Cancelled;
187
188/// Returned by [`WorkerCtx::fuel_checkpoint`] /
189/// [`CoordinatorCtx::fuel_checkpoint`] when the lease-wide shared fuel
190/// pool is exhausted (or, defensively, when called by a V1 worker that
191/// has no shared pool installed).
192///
193/// This mirrors [`super::guest::FuelExhausted`] on the wasm32 guest so
194/// source written as `ctx.fuel_checkpoint(n)?` compiles unchanged
195/// against either target. Phase 48.13 W2b: workers MUST stop computing
196/// immediately on `Err(FuelExhausted)` — there is no recovery path; a
197/// subsequent fuel-consuming instruction will trap fail-closed.
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub struct FuelExhausted;
200
201impl core::fmt::Display for FuelExhausted {
202    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
203        f.write_str("shared fuel pool exhausted")
204    }
205}
206
207// ---------------------------------------------------------------------------
208// Builder
209// ---------------------------------------------------------------------------
210
211/// Builder for a shared-memory tasklet.
212///
213/// Construct via [`FabricCpu::shared_memory_tasklet`]. The builder is
214/// generic over the programmer's shared-state type `S` and per-worker
215/// scratch type `W`. These are the types that the coordinator and workers
216/// see in their respective contexts.
217///
218/// The builder is intentionally distinct from [`crate::cpu::TaskletBuilder`]
219/// (the single-threaded path) so that "more cores" never silently means
220/// "more threads": shared-memory execution is an explicit, named mode.
221pub struct SharedTaskletBuilder<'a, S, W>
222where
223    S: Default + Send + Sync + 'static,
224    W: Default + Send + 'static,
225{
226    _cpu: &'a FabricCpu,
227    num_cores: Option<u16>,
228    max_threads: Option<u16>,
229    shared_bytes: Option<u32>,
230    scratch_bytes_per_worker: Option<u32>,
231    max_linear_memory: u32,
232    fuel: u64,
233    /// Phase 48.13 W2b — optional V2 shared-pool max fuel. When set, the
234    /// host mock constructs an `Arc<AtomicU64>` shared pool initialized
235    /// to this value, precharges each worker's slice the same way the
236    /// wasmtime runtime does, and installs the pool into every worker's
237    /// `WorkerSlot`. When `None`, no pool is installed and the
238    /// `fuel_checkpoint` SDK methods return `Err(FuelExhausted)` (V1
239    /// behavior).
240    max_fuel: Option<u64>,
241    input: Vec<u8>,
242    _phantom: PhantomData<(S, W)>,
243}
244
245impl<'a, S, W> SharedTaskletBuilder<'a, S, W>
246where
247    S: Default + Send + Sync + 'static,
248    W: Default + Send + 'static,
249{
250    fn new(cpu: &'a FabricCpu) -> Self {
251        Self {
252            _cpu: cpu,
253            num_cores: None,
254            max_threads: None,
255            shared_bytes: None,
256            scratch_bytes_per_worker: None,
257            max_linear_memory: u32::MAX,
258            fuel: 1_000_000,
259            max_fuel: None,
260            input: Vec::new(),
261            _phantom: PhantomData,
262        }
263    }
264
265    /// Number of CPU cores reserved for this tasklet (`num_cores`).
266    pub fn cores(mut self, n: u16) -> Self {
267        self.num_cores = Some(n);
268        self
269    }
270
271    /// Number of worker lanes the runtime will launch (`max_threads`).
272    /// Must satisfy `workers <= cores`.
273    pub fn workers(mut self, n: u16) -> Self {
274        self.max_threads = Some(n);
275        self
276    }
277
278    /// Size of the shared region in bytes.
279    pub fn shared_bytes(mut self, n: u32) -> Self {
280        self.shared_bytes = Some(n);
281        self
282    }
283
284    /// Size of each worker's private scratch region in bytes.
285    pub fn scratch_bytes_per_worker(mut self, n: u32) -> Self {
286        self.scratch_bytes_per_worker = Some(n);
287        self
288    }
289
290    /// Override the per-tasklet linear-memory budget. Defaults to `u32::MAX`.
291    /// `shared + workers * scratch_per_worker` must fit within this.
292    pub fn max_linear_memory(mut self, n: u32) -> Self {
293        self.max_linear_memory = n;
294        self
295    }
296
297    /// Total fuel budget shared across coordinator and all workers.
298    pub fn fuel(mut self, n: u64) -> Self {
299        self.fuel = n;
300        self
301    }
302
303    /// Phase 48.13 W2b — opt this builder into the V2 shared-pool fuel
304    /// model with `max_fuel` units in the lease-wide pool.
305    ///
306    /// When set, [`launch`](Self::launch) constructs an
307    /// `Arc<AtomicU64>` initialized to `max_fuel` and installs it into
308    /// each spawned worker's `WorkerSlot`, plus the coordinator's slot
309    /// during the launch lifetime. The
310    /// [`fuel_checkpoint`](WorkerCtx::fuel_checkpoint) SDK methods then
311    /// CAS-decrement the pool the same way the wasmtime runtime does.
312    ///
313    /// When unset (the default), no pool is installed and
314    /// `fuel_checkpoint` returns `Err(FuelExhausted)` — this preserves
315    /// the V1 behavior of all existing tests, which never call
316    /// `fuel_checkpoint` and so observe identical behavior.
317    pub fn with_max_fuel(mut self, max_fuel: u64) -> Self {
318        self.max_fuel = Some(max_fuel);
319        self
320    }
321
322    /// Input bytes the coordinator will read from shared memory.
323    pub fn input(mut self, bytes: Vec<u8>) -> Self {
324        self.input = bytes;
325        self
326    }
327
328    fn validate(&self) -> core::result::Result<SharedMemoryEnvelope, TaskletError> {
329        let cores = self.num_cores.ok_or(TaskletError::Missing("cores"))?;
330        let workers = self.max_threads.ok_or(TaskletError::Missing("workers"))?;
331        let shared = self
332            .shared_bytes
333            .ok_or(TaskletError::Missing("shared_bytes"))?;
334        let scratch = self
335            .scratch_bytes_per_worker
336            .ok_or(TaskletError::Missing("scratch_bytes_per_worker"))?;
337
338        if cores == 0 || workers == 0 || workers > cores {
339            return Err(TaskletError::InvalidWorkerCount);
340        }
341        // Phase 48.3 — shared-memory tasklets need a coordinator lane
342        // (worker 0) AND at least one data worker lane (worker 1..N).
343        // A single-lane shared-memory tasklet has no data workers and
344        // can't make progress; reject up front.
345        if workers < 2 {
346            return Err(TaskletError::InvalidWorkerCount);
347        }
348        // shared + workers * scratch  with overflow check
349        let scratch_total = (workers as u64).checked_mul(scratch as u64);
350        let total = scratch_total.and_then(|s| s.checked_add(shared as u64));
351        let total = total.ok_or(TaskletError::MemoryEnvelopeInvalid)?;
352        if total > self.max_linear_memory as u64 {
353            return Err(TaskletError::MemoryEnvelopeInvalid);
354        }
355        Ok(SharedMemoryEnvelope {
356            workers,
357            shared_bytes: shared,
358            scratch_bytes_per_worker: scratch,
359        })
360    }
361
362    /// **Target-specific.** Launch the tasklet on the host mock. This
363    /// entry point is cfg-list: the wasm32 guest's equivalent is
364    /// [`super::guest::run_shared_memory_tasklet`], which has a different
365    /// shape (it is called BY the runtime, not BY the program, and it
366    /// takes both a coordinator closure and a worker closure because
367    /// every worker lane re-enters `tasklet_run`).
368    ///
369    /// Takes a single coordinator closure. The host mock drives data
370    /// workers from inside
371    /// [`CoordinatorCtx::parallel_for_workers`], so there is no separate
372    /// worker closure at launch time. Phase 48.14 removed the previous
373    /// vestigial `_worker` parameter — it was never called on host and
374    /// only masked the genuine cross-target shape difference.
375    ///
376    /// The `coordinator` closure signature
377    /// (`FnOnce(&mut CoordinatorCtx<S, W>) -> Result<(), TaskletError>`)
378    /// is source-portable with the coordinator closure passed to
379    /// `run_shared_memory_tasklet` on wasm32, so the closure *body* can
380    /// live in a shared module while the two outer entry points live
381    /// under `#[cfg(target_arch = "wasm32")]` gates. See the
382    /// "Cross-target source patterns" section of
383    /// `docs/grafos/shared-memory-tasklet-build-guide.md`.
384    pub fn launch<FCoord>(
385        self,
386        coordinator: FCoord,
387    ) -> core::result::Result<SharedTaskletResult, TaskletError>
388    where
389        FCoord: FnOnce(&mut CoordinatorCtx<S, W>) -> core::result::Result<(), TaskletError>,
390    {
391        let env = self.validate()?;
392
393        // Construct the shared state and per-worker scratch on the host.
394        // The runtime will eventually back these by linear memory; here we
395        // back them by host-side typed values so SDK tests can exercise the
396        // ctx APIs end to end.
397        let shared = Arc::new(S::default());
398        let mut scratches: Vec<W> = (0..env.workers).map(|_| W::default()).collect();
399        let cancelled = Arc::new(AtomicBool::new(false));
400        let input: Arc<Vec<u8>> = Arc::new(self.input);
401
402        // Phase 48.13 W2b — V2 shared-pool fuel construction. If the
403        // builder opted in via `with_max_fuel`, build the pool now and
404        // precharge each worker's slice the same way the wasmtime
405        // runtime does in `fabricbiosd`'s shared-memory dispatcher:
406        //
407        //     slice = max(max_fuel / max_threads / 4, MIN_INITIAL_SLICE)
408        //
409        // The slice is debited from the pool atomically. The mock
410        // doesn't actually consume host-side native fuel — it only
411        // tracks the logical pool state — but the precharge math
412        // matches so SDK fidelity tests can compare against the runtime.
413        let shared_fuel_pool: Option<Arc<AtomicU64>> = self.max_fuel.map(|max_fuel| {
414            let pool = Arc::new(AtomicU64::new(max_fuel));
415            let workers = env.workers as u64;
416            let slice = core::cmp::max(max_fuel / workers / 4, MIN_INITIAL_SLICE);
417            // Precharge per worker. Saturating at the pool floor preserves
418            // the runtime invariant that we never debit below zero.
419            for _ in 0..workers {
420                let mut current = pool.load(Ordering::SeqCst);
421                loop {
422                    let take = core::cmp::min(slice, current);
423                    if take == 0 {
424                        break;
425                    }
426                    match pool.compare_exchange_weak(
427                        current,
428                        current - take,
429                        Ordering::SeqCst,
430                        Ordering::SeqCst,
431                    ) {
432                        Ok(_) => break,
433                        Err(observed) => current = observed,
434                    }
435                }
436            }
437            pool
438        });
439
440        let mut coord = CoordinatorCtx {
441            shared: shared.clone(),
442            scratches: &mut scratches,
443            workers: env.workers,
444            shared_bytes: env.shared_bytes,
445            scratch_bytes_per_worker: env.scratch_bytes_per_worker,
446            cancelled: cancelled.clone(),
447            input,
448            output: Vec::new(),
449            shared_fuel_pool: shared_fuel_pool.clone(),
450        };
451
452        // Install a coordinator-thread WorkerSlot for the duration of
453        // the coordinator closure so `coord.fuel_checkpoint(n)` can
454        // observe the shared pool through the same `fb_fuel_checkpoint`
455        // shim that workers use. This mirrors wasm32 where worker 0 (the
456        // coordinator lane) has its own runtime slot.
457        let coord_slot = WorkerSlot {
458            worker_index: 0,
459            worker_count: env.workers as i32,
460            cancelled: cancelled.clone(),
461            barrier: None,
462            shared_ptr: 0,
463            shared_len: 0,
464            scratch_ptr: 0,
465            scratch_len: 0,
466            shared_fuel_pool: shared_fuel_pool.clone(),
467        };
468        let coord_result = with_worker_slot(coord_slot, || coordinator(&mut coord));
469        coord_result?;
470        if cancelled.load(Ordering::SeqCst) {
471            return Err(TaskletError::Cancelled);
472        }
473        let output = core::mem::take(&mut coord.output);
474        Ok(SharedTaskletResult {
475            exit_code: 0,
476            output,
477        })
478    }
479}
480
481#[derive(Clone, Copy)]
482struct SharedMemoryEnvelope {
483    workers: u16,
484    shared_bytes: u32,
485    scratch_bytes_per_worker: u32,
486}
487
488impl FabricCpu {
489    /// Begin building a shared-memory tasklet.
490    ///
491    /// This is a distinct entry point from [`FabricCpu::submit`]. Reserving
492    /// more CPU cores via [`crate::cpu::CpuBuilder::cores`] does not by
493    /// itself produce a multi-worker tasklet — only this builder does.
494    pub fn shared_memory_tasklet<S, W>(&self) -> SharedTaskletBuilder<'_, S, W>
495    where
496        S: Default + Send + Sync + 'static,
497        W: Default + Send + 'static,
498    {
499        SharedTaskletBuilder::new(self)
500    }
501}
502
503// ---------------------------------------------------------------------------
504// CoordinatorCtx
505// ---------------------------------------------------------------------------
506
507/// Programmer-facing coordinator context.
508///
509/// The coordinator is the only role that may perform authority-bearing
510/// hostcalls (FBMU, FBBU, leases, services, logging). Workers may compute
511/// on shared state and their own scratch, but must not call into those
512/// host modules.
513pub struct CoordinatorCtx<'a, S, W>
514where
515    S: Send + Sync + 'static,
516    W: Send + 'static,
517{
518    shared: Arc<S>,
519    scratches: &'a mut Vec<W>,
520    workers: u16,
521    shared_bytes: u32,
522    scratch_bytes_per_worker: u32,
523    cancelled: Arc<AtomicBool>,
524    input: Arc<Vec<u8>>,
525    /// Output bytes written by the coordinator via `set_output`. Phase
526    /// 48.3 — cross-target API parity with the wasm32 guest: programmer
527    /// writes `coord.set_output(&bytes); Ok(())` on both targets, and the
528    /// host mock's `launch` surfaces this vec to the caller.
529    output: Vec<u8>,
530    /// Phase 48.13 W2b — V2 lease-wide shared fuel pool. `None` for V1
531    /// (per-worker-slice) launches; `Some` when the builder opted in via
532    /// `with_max_fuel`. Cloned into each worker's WorkerSlot in
533    /// `parallel_for_workers`.
534    shared_fuel_pool: Option<Arc<AtomicU64>>,
535}
536
537impl<'a, S, W> CoordinatorCtx<'a, S, W>
538where
539    S: Send + Sync + 'static,
540    W: Send + 'static,
541{
542    /// Number of worker lanes that will run.
543    pub fn worker_count(&self) -> u16 {
544        self.workers
545    }
546
547    /// Whether the tasklet has been cancelled.
548    pub fn cancelled(&self) -> bool {
549        self.cancelled.load(Ordering::SeqCst)
550    }
551
552    /// **Target-specific.** Bytes available in the shared region (host
553    /// mock only; the wasm32 guest exposes length only through
554    /// [`SharedRegion::len_bytes`]).
555    pub fn shared_bytes(&self) -> u32 {
556        self.shared_bytes
557    }
558
559    /// **Target-specific.** Bytes available in each worker's scratch
560    /// region (host mock only).
561    pub fn scratch_bytes_per_worker(&self) -> u32 {
562        self.scratch_bytes_per_worker
563    }
564
565    /// Input bytes provided to the tasklet at submission time.
566    pub fn input(&self) -> &[u8] {
567        &self.input
568    }
569
570    /// Read access to shared state.
571    pub fn shared(&self) -> &S {
572        &self.shared
573    }
574
575    /// Read-only access to a specific worker's scratch slot.
576    ///
577    /// # Safety / discipline
578    ///
579    /// Only call this AFTER a barrier that the worker has reached. Reading
580    /// a worker's scratch before that worker has finished writing to it is
581    /// a race the SDK cannot detect. Typical pattern: workers write into
582    /// their scratch slots and then call [`WorkerCtx::barrier`]; the
583    /// coordinator calls its own barrier and only then reads
584    /// `coord.scratch(i)` for each worker.
585    ///
586    /// Worker scratch is NOT a general shared-memory channel between
587    /// workers. For ongoing communication during a phase, use the `Shared`
588    /// region with atomics. Worker scratch is for results that get
589    /// collected by the coordinator at well-defined phase boundaries.
590    ///
591    /// Takes `u32` to match the wasm32 guest signature so the same
592    /// source compiles unchanged against both targets.
593    pub fn scratch(&self, worker_index: u32) -> &W {
594        &self.scratches[worker_index as usize]
595    }
596
597    /// **Target-specific.** Mutable access to a single worker's scratch
598    /// (coordinator-only). The host mock takes a worker index because
599    /// the coordinator owns `&mut` references to every lane's scratch
600    /// slot; the wasm32 guest's `CoordinatorCtx::scratch_mut` takes no
601    /// arguments and returns worker 0's scratch slot only. Portable
602    /// source that needs cross-lane mutable scratch access must
603    /// `#[cfg]`-gate this call.
604    pub fn scratch_mut(&mut self, worker_index: u32) -> &mut W {
605        &mut self.scratches[worker_index as usize]
606    }
607
608    /// Number of data workers (excluding the coordinator lane).
609    ///
610    /// Equal to `worker_count() - 1`. Mirrors the wasm32 guest helper
611    /// of the same name; programmer code calling this compiles and runs
612    /// identically on both targets.
613    pub fn data_worker_count(&self) -> u32 {
614        (self.workers as u32) - 1
615    }
616
617    /// Read-only access to a specific data worker's scratch slot.
618    ///
619    /// `i` is the data worker index in `0..data_worker_count()`.
620    /// Internally this maps to wasm worker index `i + 1` (skipping the
621    /// coordinator's own scratch slot at wasm index 0, which is unused
622    /// under the coordinator-lane model).
623    ///
624    /// Same barrier discipline as [`CoordinatorCtx::scratch`]: only call
625    /// after a barrier the target data worker has reached.
626    pub fn data_scratch(&self, i: u32) -> &W {
627        self.scratch(i + 1)
628    }
629
630    /// **Target-specific.** Mark the tasklet as cancelled. Host mock
631    /// only — on wasm32 the runtime drives cancellation via
632    /// `fb_tasklet_cancelled()` and the guest has no `cancel()` lever.
633    /// Workers will observe this on their next
634    /// [`WorkerCtx::cancelled`] check or barrier wait.
635    pub fn cancel(&self) {
636        self.cancelled.store(true, Ordering::SeqCst);
637    }
638
639    /// **Behavior diverges across targets.** See the "Source portability"
640    /// section of `docs/grafos/shared-memory-tasklet-programming-model.md`.
641    ///
642    /// On the **host mock** this call:
643    ///
644    /// 1. Returns `Err(Cancelled)` if cancellation has already been
645    ///    committed at the call site.
646    /// 2. Does NOT actually block on anything. The coordinator lane is
647    ///    not a participant in the `std::sync::Barrier` used inside
648    ///    [`CoordinatorCtx::parallel_for_workers`] (that barrier sizes
649    ///    to the data-worker count only).
650    ///
651    /// On **wasm32** ([`super::guest::CoordinatorCtx::barrier`]) this
652    /// same call IS a real cross-lane barrier with the data workers,
653    /// because wasm32 worker 0 re-enters `tasklet_run` alongside
654    /// every other worker and all lanes rendezvous at
655    /// `fb_barrier_wait()`.
656    ///
657    /// The recommended portable pattern: use
658    /// [`CoordinatorCtx::parallel_for_workers`] as the phase boundary
659    /// on host (which already performs the join), and use
660    /// `coord.barrier()?` between phases on wasm32. A single source
661    /// body gated with `#[cfg(target_arch = "wasm32")]` is the honest
662    /// tool here — the SDK does not paper this over.
663    ///
664    /// Phase 48.14 closes Phase 48.3 follow-up #18 (cancellable-barrier
665    /// upgrade) as WONTFIX: the divergence is documented as an
666    /// intentional target-specific behavior, not slated for semantic
667    /// unification.
668    pub fn barrier(&self) -> core::result::Result<(), Cancelled> {
669        if self.cancelled.load(Ordering::SeqCst) {
670            Err(Cancelled)
671        } else {
672            Ok(())
673        }
674    }
675
676    /// Write the coordinator's output bytes. Mirrors the wasm32 guest
677    /// `CoordinatorCtx::set_output`: programmer calls this to emit the
678    /// tasklet output and then returns `Ok(())` from the coordinator
679    /// closure.
680    ///
681    /// On wasm32 the runtime copies `bytes` into the host-managed output
682    /// region and the coordinator's `tasklet_run` returns the number of
683    /// bytes written. Here on the host mock, `launch` collects the
684    /// recorded output and returns it inside [`SharedTaskletResult`].
685    pub fn set_output(&mut self, bytes: &[u8]) {
686        self.output.clear();
687        self.output.extend_from_slice(bytes);
688    }
689
690    /// Cooperatively reconcile fuel with the lease-wide shared pool.
691    ///
692    /// Returns `Ok(())` on a successful debit. Returns
693    /// `Err(FuelExhausted)` when the shared pool is exhausted, in which
694    /// case the coordinator MUST stop computing immediately — there is
695    /// no recovery path; a subsequent fuel-consuming instruction will
696    /// trap fail-closed.
697    ///
698    /// V2 (shared-pool) workers MUST call this at safe points within
699    /// any compute loop that may consume more fuel than the initial
700    /// precharge. On V1 (per-worker-slice) launches this method always
701    /// returns `Err(FuelExhausted)` because no shared pool is installed
702    /// — V1 source should not call it.
703    ///
704    /// `amount == 0` is a silent no-op returning `Ok(())`.
705    ///
706    /// Mirrors [`super::guest::CoordinatorCtx::fuel_checkpoint`] —
707    /// source compiled for either target observes identical
708    /// success/failure semantics.
709    pub fn fuel_checkpoint(&mut self, amount: u64) -> core::result::Result<(), FuelExhausted> {
710        if amount > i64::MAX as u64 {
711            return Err(FuelExhausted);
712        }
713        let result = mock::fb_fuel_checkpoint(amount as i64);
714        if result == 0 {
715            Ok(())
716        } else {
717            Err(FuelExhausted)
718        }
719    }
720
721    /// **Target-specific.** Run a worker phase: invokes `body` once per **data worker lane**
722    /// in parallel, each with its own [`WorkerCtx`]. Returns when every
723    /// data worker has completed (the implicit tasklet-wide barrier).
724    ///
725    /// Phase 48.3 — runs the body on data worker lanes
726    /// `1..worker_count()`. Worker 0 is the coordinator lane and does
727    /// not run the worker closure on either host mock or wasm32
728    /// targets — this matches the wasm32 SDK contract in
729    /// [`super::guest::run_shared_memory_tasklet`]. The number of body
730    /// invocations is therefore `data_worker_count() == worker_count() - 1`.
731    ///
732    /// `body` may compute on shared state (via interior mutability such as
733    /// atomics) and on its own scratch. It may NOT perform authority-bearing
734    /// hostcalls — those are coordinator-only by contract.
735    ///
736    /// This method is host-mock-only. The wasm32 guest drives workers by
737    /// having the runtime call `tasklet_run` once per lane; a program
738    /// writing source for both targets must `#[cfg]`-gate the outer
739    /// entry-point structure (see "Cross-target source patterns" in
740    /// the build guide). The worker closure *body* can still be shared
741    /// verbatim between the two entry points.
742    pub fn parallel_for_workers<F>(&mut self, body: F) -> core::result::Result<(), TaskletError>
743    where
744        F: Fn(&mut WorkerCtx<S, W>) + Sync + Send,
745    {
746        let workers = self.workers;
747        // Validated at builder time; defensive assert for the rare path
748        // where a caller constructed a CoordinatorCtx by hand.
749        assert!(
750            workers >= 2,
751            "shared-memory tasklet requires at least 2 worker lanes \
752             (1 coordinator + >=1 data worker)"
753        );
754        let shared = self.shared.clone();
755        let cancelled = self.cancelled.clone();
756        let shared_fuel_pool = self.shared_fuel_pool.clone();
757        let input_slice: &[u8] = &self.input;
758        // Barrier counts data workers only — the coordinator lane does
759        // not participate in the worker barrier.
760        let data_count = (workers - 1) as usize;
761        let barrier = Arc::new(std::sync::Barrier::new(data_count));
762
763        // Take exclusive references to each scratch slot. We split the Vec
764        // into per-element &mut and hand each lane its own. Worker 0's
765        // scratch slot is unused under the coordinator-lane model but we
766        // still hold its &mut so the index math lines up with wasm32.
767        let mut slots: Vec<&mut W> = self.scratches.iter_mut().collect();
768
769        std::thread::scope(|scope| {
770            let body = &body;
771            let mut handles = Vec::with_capacity(data_count);
772            let mut indexed: Vec<(u16, &mut W)> = slots
773                .drain(..)
774                .enumerate()
775                .map(|(i, s)| (i as u16, s))
776                .collect();
777
778            // Drop worker 0's slot — the coordinator lane does not run
779            // the body. Its scratch slot remains untouched in this
780            // phase, matching wasm32 where worker 0's scratch is
781            // accessible to the coordinator via `coord.scratch(0)` but
782            // never written by `parallel_for_workers`.
783            let _coord_slot = indexed.remove(0);
784
785            for _ in 1..workers {
786                let (idx, scratch_ref) = indexed.remove(0);
787                let shared = shared.clone();
788                let cancelled = cancelled.clone();
789                let barrier = barrier.clone();
790                let input = input_slice;
791                let shared_fuel_pool = shared_fuel_pool.clone();
792                handles.push(scope.spawn(move || {
793                    let slot = WorkerSlot {
794                        worker_index: idx as i32,
795                        worker_count: workers as i32,
796                        cancelled: cancelled.clone(),
797                        barrier: Some(barrier.clone()),
798                        shared_ptr: 0,
799                        shared_len: 0,
800                        scratch_ptr: 0,
801                        scratch_len: 0,
802                        shared_fuel_pool,
803                    };
804                    with_worker_slot(slot, || {
805                        let mut wctx = WorkerCtx {
806                            shared,
807                            scratch: scratch_ref,
808                            worker_index: idx,
809                            worker_count: workers,
810                            cancelled,
811                            barrier,
812                            input,
813                        };
814                        body(&mut wctx);
815                    });
816                }));
817            }
818            for h in handles {
819                let _ = h.join();
820            }
821        });
822
823        if self.cancelled.load(Ordering::SeqCst) {
824            return Err(TaskletError::Cancelled);
825        }
826        Ok(())
827    }
828}
829
830// ---------------------------------------------------------------------------
831// WorkerCtx
832// ---------------------------------------------------------------------------
833
834/// Programmer-facing worker context.
835///
836/// Workers may read shared state, mutate their own scratch, observe
837/// cancellation, and synchronize at the implicit tasklet-wide barrier.
838/// Workers must not perform authority-bearing hostcalls.
839pub struct WorkerCtx<'a, S, W>
840where
841    S: Send + Sync + 'static,
842    W: Send + 'static,
843{
844    shared: Arc<S>,
845    scratch: &'a mut W,
846    worker_index: u16,
847    worker_count: u16,
848    cancelled: Arc<AtomicBool>,
849    barrier: Arc<std::sync::Barrier>,
850    input: &'a [u8],
851}
852
853impl<'a, S, W> WorkerCtx<'a, S, W>
854where
855    S: Send + Sync + 'static,
856    W: Send + 'static,
857{
858    /// This worker's lane index in `[0, worker_count())`.
859    pub fn worker_index(&self) -> u16 {
860        self.worker_index
861    }
862
863    /// Total worker count for this tasklet.
864    pub fn worker_count(&self) -> u16 {
865        self.worker_count
866    }
867
868    /// Index of this worker among data workers (excluding the
869    /// coordinator lane).
870    ///
871    /// Data workers are wasm worker indices `1..worker_count()`,
872    /// exposed here as `0..data_worker_count()`. Use this for
873    /// partitioning input across data workers via `partition::range` or
874    /// `partition::tiles`. Mirrors the wasm32 guest helper.
875    ///
876    /// Under the Phase 48.3 coordinator-lane programming model, worker
877    /// 0 only runs the coordinator closure, so this method is only ever
878    /// called from a worker closure invocation where `worker_index() >=
879    /// 1`, and the subtraction is structurally safe.
880    pub fn data_worker_index(&self) -> u32 {
881        (self.worker_index as u32) - 1
882    }
883
884    /// Number of data workers (worker lanes excluding the coordinator
885    /// lane).
886    ///
887    /// Equal to `worker_count() - 1`. Always at least 1 for a valid
888    /// shared-memory tasklet (the SDK builder requires `workers >= 2`).
889    pub fn data_worker_count(&self) -> u32 {
890        (self.worker_count as u32) - 1
891    }
892
893    /// **Behavior diverges across targets.** See the "Source portability"
894    /// section of `docs/grafos/shared-memory-tasklet-programming-model.md`.
895    ///
896    /// Wait at the implicit tasklet-wide barrier. Returns
897    /// [`Err(Cancelled)`] if the tasklet was cancelled by the time this
898    /// worker releases from the barrier.
899    ///
900    /// On the **host mock** this IS a real barrier among the data
901    /// workers: the underlying `std::sync::Barrier` is sized to
902    /// `data_worker_count()` (not `worker_count()`), and the
903    /// coordinator lane is not a participant. That means host-side
904    /// `w.barrier()` rendezvous with other data workers, but NOT with
905    /// the coordinator.
906    ///
907    /// On **wasm32** ([`super::guest::WorkerCtx::barrier`]) this
908    /// rendezvous with the coordinator lane as well, because worker 0
909    /// re-enters `tasklet_run` and participates in `fb_barrier_wait()`.
910    ///
911    /// The asymmetry in a single sentence: on host mock, `coord.barrier()`
912    /// is a no-op-with-cancel-check while `w.barrier()` is a real
913    /// data-worker rendezvous. On wasm32 both are real full-lane
914    /// barriers. Source that relies on cross-lane coordinator/worker
915    /// rendezvous must `#[cfg]`-gate this call.
916    pub fn barrier(&self) -> core::result::Result<(), Cancelled> {
917        self.barrier.wait();
918        if self.cancelled.load(Ordering::SeqCst) {
919            Err(Cancelled)
920        } else {
921            Ok(())
922        }
923    }
924
925    /// Whether the tasklet has been cancelled.
926    pub fn cancelled(&self) -> bool {
927        self.cancelled.load(Ordering::SeqCst)
928    }
929
930    /// Read-only access to shared state. For shared mutation, the program
931    /// uses interior mutability inside `S` (e.g. atomics).
932    pub fn shared(&self) -> &S {
933        &self.shared
934    }
935
936    /// Input bytes provided to the tasklet at submission time.
937    ///
938    /// Mirrors [`CoordinatorCtx::input`]: every worker sees the same
939    /// bytes read-only. Workers must not mutate these bytes; for
940    /// cross-worker communication use the `Shared` region with atomics.
941    ///
942    /// The returned slice is tied to the context's lifetime rather
943    /// than to `&self`, so callers can hold it across a
944    /// `scratch_mut()` borrow.
945    pub fn input(&self) -> &'a [u8] {
946        self.input
947    }
948
949    /// Mutable access to *this* worker's private scratch. The type system
950    /// guarantees that no other worker can touch this slot.
951    pub fn scratch_mut(&mut self) -> &mut W {
952        self.scratch
953    }
954
955    /// **Target-specific.** Read-only access to *this* worker's scratch.
956    /// Host mock only — the wasm32 guest's `WorkerCtx` exposes
957    /// `scratch_mut(&mut self)` only and has no read-only accessor.
958    pub fn scratch(&self) -> &W {
959        self.scratch
960    }
961
962    /// Cooperatively reconcile fuel with the lease-wide shared pool.
963    ///
964    /// Returns `Ok(())` on a successful debit. Returns
965    /// `Err(FuelExhausted)` when the shared pool is exhausted, in which
966    /// case the worker MUST stop computing immediately — there is no
967    /// recovery path; a subsequent fuel-consuming instruction will trap
968    /// fail-closed.
969    ///
970    /// V2 (shared-pool) workers MUST call this at safe points within
971    /// any compute loop that may consume more fuel than the initial
972    /// precharge. On V1 (per-worker-slice) launches this method always
973    /// returns `Err(FuelExhausted)` because no shared pool is installed
974    /// — V1 source should not call it.
975    ///
976    /// `amount == 0` is a silent no-op returning `Ok(())`.
977    ///
978    /// Mirrors [`super::guest::WorkerCtx::fuel_checkpoint`].
979    pub fn fuel_checkpoint(&mut self, amount: u64) -> core::result::Result<(), FuelExhausted> {
980        if amount > i64::MAX as u64 {
981            return Err(FuelExhausted);
982        }
983        let result = mock::fb_fuel_checkpoint(amount as i64);
984        if result == 0 {
985            Ok(())
986        } else {
987            Err(FuelExhausted)
988        }
989    }
990}
991
992// ---------------------------------------------------------------------------
993// SharedRegion / WorkerScratch — typed views over raw bytes.
994//
995// These exist for programs that want raw byte-addressed access to the
996// shared / scratch regions instead of a typed `S` / `W`. The runtime will
997// eventually back these with linear-memory slices; in the mock SDK they
998// are zero-length placeholders.
999// ---------------------------------------------------------------------------
1000
1001/// Typed wrapper over the shared region of a tasklet.
1002pub struct SharedRegion<T> {
1003    _phantom: PhantomData<T>,
1004    len: u32,
1005}
1006
1007impl<T> SharedRegion<T> {
1008    /// Returns the length of the shared region in bytes.
1009    pub fn len_bytes(&self) -> u32 {
1010        self.len
1011    }
1012}
1013
1014/// Typed wrapper over a single worker's scratch region.
1015///
1016/// The lifetime of `WorkerScratch` is tied to the worker lane that owns it,
1017/// so it cannot alias another worker's scratch.
1018pub struct WorkerScratch<T> {
1019    _phantom: PhantomData<T>,
1020    len: u32,
1021}
1022
1023impl<T> WorkerScratch<T> {
1024    /// Returns the length of the scratch region in bytes.
1025    pub fn len_bytes(&self) -> u32 {
1026        self.len
1027    }
1028}
1029
1030// ---------------------------------------------------------------------------
1031// Tests
1032// ---------------------------------------------------------------------------
1033
1034#[cfg(test)]
1035mod tests {
1036    use super::*;
1037    use crate::cpu::CpuBuilder;
1038    use crate::cpu_shared::partition::{range, tiles, Tile};
1039    use crate::host;
1040    use std::collections::HashSet;
1041    use std::sync::atomic::AtomicU32;
1042
1043    #[derive(Default)]
1044    struct TestShared {
1045        counter: AtomicU32,
1046    }
1047
1048    #[derive(Default, Clone)]
1049    struct TestScratch {
1050        local: u32,
1051    }
1052
1053    fn cpu() -> crate::cpu::CpuLease {
1054        host::reset_mock();
1055        CpuBuilder::new().cores(4).acquire().expect("acquire")
1056    }
1057
1058    #[test]
1059    fn builder_fluent_api_compiles() {
1060        let lease = cpu();
1061        let result = lease
1062            .cpu()
1063            .shared_memory_tasklet::<TestShared, TestScratch>()
1064            .cores(4)
1065            .workers(4)
1066            .shared_bytes(1024)
1067            .scratch_bytes_per_worker(64)
1068            .fuel(1_000_000)
1069            .input(b"hi".to_vec())
1070            .launch(|coord| {
1071                assert_eq!(coord.worker_count(), 4);
1072                assert_eq!(coord.input(), b"hi");
1073                coord.set_output(b"ok");
1074                Ok(())
1075            })
1076            .expect("launch");
1077        assert_eq!(result.exit_code, 0);
1078        assert_eq!(&result.output, b"ok");
1079    }
1080
1081    #[test]
1082    fn rejects_workers_gt_cores() {
1083        let lease = cpu();
1084        let err = lease
1085            .cpu()
1086            .shared_memory_tasklet::<TestShared, TestScratch>()
1087            .cores(2)
1088            .workers(4)
1089            .shared_bytes(0)
1090            .scratch_bytes_per_worker(0)
1091            .launch(|_c| Ok(()))
1092            .unwrap_err();
1093        assert_eq!(err, TaskletError::InvalidWorkerCount);
1094    }
1095
1096    #[test]
1097    fn rejects_oversize_envelope() {
1098        let lease = cpu();
1099        let err = lease
1100            .cpu()
1101            .shared_memory_tasklet::<TestShared, TestScratch>()
1102            .cores(4)
1103            .workers(4)
1104            .shared_bytes(100)
1105            .scratch_bytes_per_worker(50)
1106            .max_linear_memory(150) // 100 + 4*50 = 300 > 150
1107            .launch(|_c| Ok(()))
1108            .unwrap_err();
1109        assert_eq!(err, TaskletError::MemoryEnvelopeInvalid);
1110    }
1111
1112    #[test]
1113    fn partition_range_divides_evenly() {
1114        assert_eq!(range(100, 0, 4), 0..25);
1115        assert_eq!(range(100, 1, 4), 25..50);
1116        assert_eq!(range(100, 2, 4), 50..75);
1117        assert_eq!(range(100, 3, 4), 75..100);
1118    }
1119
1120    #[test]
1121    fn partition_range_handles_remainder() {
1122        let r0 = range(10, 0, 3);
1123        let r1 = range(10, 1, 3);
1124        let r2 = range(10, 2, 3);
1125        // Full coverage, no overlap.
1126        assert_eq!(r0.start, 0);
1127        assert_eq!(r0.end, r1.start);
1128        assert_eq!(r1.end, r2.start);
1129        assert_eq!(r2.end, 10);
1130        // First (10 % 3) = 1 worker gets one extra.
1131        assert_eq!(r0.len(), 4);
1132        assert_eq!(r1.len(), 3);
1133        assert_eq!(r2.len(), 3);
1134    }
1135
1136    #[test]
1137    fn partition_tiles_covers_full_image() {
1138        let mut covered: HashSet<(u32, u32)> = HashSet::new();
1139        let mut total_pixels = 0usize;
1140        for wi in 0..4u16 {
1141            for Tile { x, y, w, h } in tiles(64, 64, 16, 16, wi, 4) {
1142                for dy in 0..h {
1143                    for dx in 0..w {
1144                        let inserted = covered.insert((x + dx, y + dy));
1145                        assert!(inserted, "pixel covered twice");
1146                        total_pixels += 1;
1147                    }
1148                }
1149            }
1150        }
1151        assert_eq!(total_pixels, 64 * 64);
1152        assert_eq!(covered.len(), 64 * 64);
1153    }
1154
1155    #[test]
1156    fn coordinator_ctx_exposes_worker_count() {
1157        let lease = cpu();
1158        lease
1159            .cpu()
1160            .shared_memory_tasklet::<TestShared, TestScratch>()
1161            .cores(8)
1162            .workers(8)
1163            .shared_bytes(0)
1164            .scratch_bytes_per_worker(0)
1165            .launch(|coord| {
1166                assert_eq!(coord.worker_count(), 8);
1167                Ok(())
1168            })
1169            .unwrap();
1170    }
1171
1172    #[test]
1173    fn worker_ctx_exposes_worker_index() {
1174        // Phase 48.3 — under the coordinator-lane model,
1175        // parallel_for_workers runs the body on data worker lanes
1176        // 1..worker_count(). With workers=4, the body sees raw wasm
1177        // worker indices [1, 2, 3] (NOT [0, 1, 2, 3]).
1178        let lease = cpu();
1179        let seen = Arc::new(std::sync::Mutex::new(Vec::<u16>::new()));
1180        let seen_data = Arc::new(std::sync::Mutex::new(Vec::<u32>::new()));
1181        let seen_clone = seen.clone();
1182        let seen_data_clone = seen_data.clone();
1183        lease
1184            .cpu()
1185            .shared_memory_tasklet::<TestShared, TestScratch>()
1186            .cores(4)
1187            .workers(4)
1188            .shared_bytes(0)
1189            .scratch_bytes_per_worker(0)
1190            .launch(move |coord| {
1191                assert_eq!(coord.data_worker_count(), 3);
1192                let s = seen_clone.clone();
1193                let sd = seen_data_clone.clone();
1194                coord
1195                    .parallel_for_workers(move |w| {
1196                        assert_eq!(w.worker_count(), 4);
1197                        assert_eq!(w.data_worker_count(), 3);
1198                        s.lock().unwrap().push(w.worker_index());
1199                        sd.lock().unwrap().push(w.data_worker_index());
1200                    })
1201                    .unwrap();
1202                Ok(())
1203            })
1204            .unwrap();
1205        let mut got = seen.lock().unwrap().clone();
1206        got.sort();
1207        assert_eq!(got, vec![1, 2, 3]);
1208        let mut got_data = seen_data.lock().unwrap().clone();
1209        got_data.sort();
1210        assert_eq!(got_data, vec![0, 1, 2]);
1211    }
1212
1213    #[test]
1214    fn shared_region_visible_to_all_workers() {
1215        let lease = cpu();
1216        let result = lease
1217            .cpu()
1218            .shared_memory_tasklet::<TestShared, TestScratch>()
1219            .cores(4)
1220            .workers(4)
1221            .shared_bytes(1024)
1222            .scratch_bytes_per_worker(64)
1223            .launch(|coord| {
1224                coord
1225                    .parallel_for_workers(|w| {
1226                        w.shared().counter.fetch_add(1, Ordering::SeqCst);
1227                        w.scratch_mut().local = w.worker_index() as u32;
1228                    })
1229                    .unwrap();
1230                let n = coord.shared().counter.load(Ordering::SeqCst);
1231                coord.set_output(&n.to_le_bytes());
1232                Ok(())
1233            })
1234            .unwrap();
1235        // Phase 48.3 — coordinator lane (worker 0) does NOT run the
1236        // worker closure, so only `data_worker_count() == 3` lanes
1237        // increment the counter when launched with workers=4.
1238        assert_eq!(u32::from_le_bytes(result.output.try_into().unwrap()), 3);
1239    }
1240
1241    #[test]
1242    fn worker_ctx_sees_input_bytes() {
1243        let lease = cpu();
1244        let seen = Arc::new(std::sync::Mutex::new(Vec::<Vec<u8>>::new()));
1245        let seen_clone = seen.clone();
1246        lease
1247            .cpu()
1248            .shared_memory_tasklet::<TestShared, TestScratch>()
1249            .cores(4)
1250            .workers(4)
1251            .shared_bytes(0)
1252            .scratch_bytes_per_worker(0)
1253            .input(b"hello workers".to_vec())
1254            .launch(move |coord| {
1255                let s = seen_clone.clone();
1256                assert_eq!(coord.input(), b"hello workers");
1257                coord
1258                    .parallel_for_workers(move |w| {
1259                        assert_eq!(w.input(), b"hello workers");
1260                        s.lock().unwrap().push(w.input().to_vec());
1261                    })
1262                    .unwrap();
1263                Ok(())
1264            })
1265            .unwrap();
1266        // Phase 48.3 — only the 3 data worker lanes (workers 1..4)
1267        // run the body; worker 0 is the coordinator lane.
1268        let got = seen.lock().unwrap().clone();
1269        assert_eq!(got.len(), 3);
1270        for v in got {
1271            assert_eq!(v, b"hello workers");
1272        }
1273    }
1274
1275    #[test]
1276    fn coordinator_reads_cross_worker_scratch_after_barrier() {
1277        let lease = cpu();
1278        lease
1279            .cpu()
1280            .shared_memory_tasklet::<TestShared, TestScratch>()
1281            .cores(4)
1282            .workers(4)
1283            .shared_bytes(0)
1284            .scratch_bytes_per_worker(64)
1285            .launch(|coord| {
1286                // Each data worker writes its (index*10) into its
1287                // scratch. parallel_for_workers joins all data
1288                // workers before returning, which acts as the
1289                // barrier.
1290                coord
1291                    .parallel_for_workers(|w| {
1292                        w.scratch_mut().local = (w.worker_index() as u32) * 10;
1293                    })
1294                    .unwrap();
1295                // Now read every worker's scratch from the
1296                // coordinator. Worker 0 (coordinator lane) was not
1297                // touched, so its slot stays at the default 0;
1298                // workers 1, 2, 3 wrote 10, 20, 30. Sum = 60.
1299                //
1300                // Coincidence note: the post-Phase-48.3 sum
1301                // (0 + 10 + 20 + 30) happens to equal the
1302                // pre-Phase-48.3 sum (0 + 10 + 20 + 30 — same set
1303                // because worker 0 always wrote 0 anyway). If this
1304                // assertion's magic numbers are ever changed, the
1305                // expected value MUST be re-derived against the
1306                // current "data workers only" model — do NOT
1307                // assume the old "all lanes participate" math.
1308                let mut sum: u32 = 0;
1309                for i in 0..coord.worker_count() {
1310                    sum += coord.scratch(u32::from(i)).local;
1311                }
1312                coord.set_output(&sum.to_le_bytes());
1313                Ok(())
1314            })
1315            .map(|r| {
1316                assert_eq!(u32::from_le_bytes(r.output.try_into().unwrap()), 60);
1317            })
1318            .unwrap();
1319    }
1320
1321    #[test]
1322    fn rejects_single_lane_shared_memory_tasklet() {
1323        // Phase 48.3 — a 1-lane shared-memory tasklet has a coordinator
1324        // but no data workers and cannot make progress; the builder
1325        // must reject up front.
1326        let lease = cpu();
1327        let err = lease
1328            .cpu()
1329            .shared_memory_tasklet::<TestShared, TestScratch>()
1330            .cores(1)
1331            .workers(1)
1332            .shared_bytes(0)
1333            .scratch_bytes_per_worker(0)
1334            .launch(|_c| Ok(()))
1335            .unwrap_err();
1336        assert_eq!(err, TaskletError::InvalidWorkerCount);
1337    }
1338
1339    #[test]
1340    fn fuel_checkpoint_succeeds_until_pool_exhausted() {
1341        // Phase 48.13 W2b — V2 shared-pool launch with a small max_fuel.
1342        // The coordinator calls fuel_checkpoint repeatedly until the
1343        // pool exhausts, and the test verifies the deterministic
1344        // transition from Ok(()) to Err(FuelExhausted).
1345        let lease = cpu();
1346        let result = lease
1347            .cpu()
1348            .shared_memory_tasklet::<TestShared, TestScratch>()
1349            .cores(2)
1350            .workers(2)
1351            .shared_bytes(0)
1352            .scratch_bytes_per_worker(0)
1353            // workers=2 → precharge math: slice = max(20_000/2/4, 4096)
1354            // = max(2500, 4096) = 4096; precharge consumes 2*4096 = 8192
1355            // out of 20_000, leaving 11_808 in the pool for checkpoints.
1356            .with_max_fuel(20_000)
1357            .launch(|coord| {
1358                // Drain the pool in 1024-unit ticks. We don't know
1359                // the exact starting balance (it depends on the
1360                // precharge math), but we know it's bounded above
1361                // by max_fuel and that successive checkpoints must
1362                // eventually return FuelExhausted.
1363                let mut succeeded: u32 = 0;
1364                let mut hit_exhaustion = false;
1365                for _ in 0..1000 {
1366                    match coord.fuel_checkpoint(1024) {
1367                        Ok(()) => succeeded += 1,
1368                        Err(FuelExhausted) => {
1369                            hit_exhaustion = true;
1370                            break;
1371                        }
1372                    }
1373                }
1374                assert!(hit_exhaustion, "pool must eventually exhaust");
1375                // After a failed 1024-tick the pool may still hold
1376                // 0..1023 units. Drain it byte-by-byte until even a
1377                // single-unit checkpoint fails, then assert any
1378                // further debit fails.
1379                for _ in 0..1024 {
1380                    if coord.fuel_checkpoint(1).is_err() {
1381                        break;
1382                    }
1383                }
1384                assert_eq!(coord.fuel_checkpoint(1), Err(FuelExhausted));
1385                assert_eq!(coord.fuel_checkpoint(1024), Err(FuelExhausted));
1386                // amount == 0 is a silent no-op returning Ok(()) even
1387                // post-exhaustion.
1388                assert_eq!(coord.fuel_checkpoint(0), Ok(()));
1389                coord.set_output(&succeeded.to_le_bytes());
1390                Ok(())
1391            })
1392            .expect("launch");
1393        let succeeded = u32::from_le_bytes(result.output.try_into().unwrap());
1394        // Sanity: at least one checkpoint must have succeeded (the pool
1395        // is large enough to accommodate several 1024-unit debits).
1396        assert!(succeeded >= 1, "at least one checkpoint must succeed");
1397    }
1398
1399    #[test]
1400    fn fuel_checkpoint_returns_err_when_no_pool_installed() {
1401        // Phase 48.13 W2b — a launch that does NOT call with_max_fuel
1402        // simulates a V1 worker. fuel_checkpoint returns
1403        // Err(FuelExhausted) on every call (other than amount==0).
1404        let lease = cpu();
1405        lease
1406            .cpu()
1407            .shared_memory_tasklet::<TestShared, TestScratch>()
1408            .cores(2)
1409            .workers(2)
1410            .shared_bytes(0)
1411            .scratch_bytes_per_worker(0)
1412            // No with_max_fuel — V1 simulation.
1413            .launch(|coord| {
1414                assert_eq!(coord.fuel_checkpoint(1), Err(FuelExhausted));
1415                assert_eq!(coord.fuel_checkpoint(1024), Err(FuelExhausted));
1416                // amount == 0 is still a no-op.
1417                assert_eq!(coord.fuel_checkpoint(0), Ok(()));
1418                coord
1419                    .parallel_for_workers(|w| {
1420                        assert_eq!(w.fuel_checkpoint(1), Err(FuelExhausted));
1421                        assert_eq!(w.fuel_checkpoint(0), Ok(()));
1422                    })
1423                    .unwrap();
1424                Ok(())
1425            })
1426            .expect("launch");
1427    }
1428
1429    #[test]
1430    fn cancel_propagates_to_workers() {
1431        let lease = cpu();
1432        let result = lease
1433            .cpu()
1434            .shared_memory_tasklet::<TestShared, TestScratch>()
1435            .cores(2)
1436            .workers(2)
1437            .shared_bytes(0)
1438            .scratch_bytes_per_worker(0)
1439            .launch(|coord| {
1440                coord.cancel();
1441                assert!(coord.cancelled());
1442                Ok(())
1443            });
1444        assert_eq!(result.unwrap_err(), TaskletError::Cancelled);
1445    }
1446}