Skip to content

Recipe 22: Deterministic Latency Audio Over Borrowed Hardware

Situation

Audio processing requires deterministic latency. Buffer underruns produce audible glitches. Traditional audio systems pre-allocate fixed resources on a single machine. You want to borrow CPU and memory from the fabric for the exact duration of the audio session, with hard guarantees that resources will not be reclaimed mid-stream.

The key insight: grafos-dsp pipelines track DspStats.overruns (blocks where processing exceeded the block duration). If overruns are zero, the pipeline met real-time constraints. Admission control and lease renewal close the loop: check before you start, and keep renewing while you run.

What You Build

A DSP pipeline (FFT, FIR filter, gain, resample) where:

  • grafos_scheduler::AdmissionController pre-checks CPU and memory availability
  • Leases are acquired for CPU and memory resources
  • grafos_leasekit::RenewalManager drives lease renewal in the DSP tick loop
  • DspStats.overruns is the measurable indicator of resource starvation

Building Blocks

  • grafos_dsp::pipeline::{DspPipeline, DspPipelineHandle, DspSource, DspSink, VecSource, CollectSink} — pipeline builder and execution — source
  • grafos_dsp::stages::{FftStage, FirFilterStage, GainStage, ResampleStage} — built-in processing stages — source
  • grafos_dsp::types::{DspConfig, DspStats, Block, Sample} — configuration and statistics — source
  • grafos_dsp::placement::StagePlacement — stage placement hints — source
  • grafos_scheduler::AdmissionController — capacity pre-check — source
  • grafos_leasekit::{RenewalManager, RenewalPolicy} — lease renewal — source

Design

Pipeline Topology

Source -> FFT -> FIR Filter -> Gain -> Resample -> Sink

Each stage implements DspStage::process(&mut self, block: &Block) -> Result<Block>. The pipeline processes blocks sequentially through all stages. DspConfig sets the block size, sample rate, and channel count.

Admission Pre-Check

Before acquiring leases, ask AdmissionController whether the fabric has enough capacity. This avoids half-provisioned sessions where you get memory but not CPU.

Lease Renewal in the Tick Loop

The DSP pipeline runs block-by-block. Between blocks (or every N blocks), call RenewalManager::tick(now). This keeps leases alive for the session duration without a separate renewal thread.

Overrun Detection

DspPipelineHandle::run() compares per-block processing time against the block duration (block_size / sample_rate). If processing exceeds the block duration, stats.overruns increments. Zero overruns = real-time guarantees met.

Walkthrough (Implementation Sketch)

Core grafOS API Path

Use current scheduler admission APIs to check capacity, then acquire memory and run the DSP pipeline with renewal tracking:

use grafos_core::ResourceKind;
use grafos_scheduler::{
AdmissionController, CapacityLedger, NodeConstraint, ReservationStore,
};
use grafos_std::mem::MemBuilder;
let block_size = 256u64;
let now = unix_time_secs();
let ledger = CapacityLedger::new();
// Populate ledger from fabric inventory before checking admission.
let controller = AdmissionController::new(ledger, ReservationStore::new());
let decision = controller.check(
9001, // holder id
ResourceKind::Mem,
block_size * 4 * 16, // bytes
&NodeConstraint::Any,
now,
None,
)?;
let mem_lease = MemBuilder::new()
.min_bytes(block_size * 4 * 16)
.lease_secs(300)
.acquire()?;
# let _ = (decision, mem_lease);
# Ok::<(), Box<dyn std::error::Error>>(())

1. Admission Check

use grafos_core::ResourceKind;
use grafos_scheduler::{
AdmissionController, CapacityLedger, NodeConstraint, ReservationStore,
};
let ledger = CapacityLedger::new();
// ... populate ledger from fabric inventory ...
let controller = AdmissionController::new(ledger, ReservationStore::new());
let decision = controller.check(
9001,
ResourceKind::Mem,
block_size * 4 * 16,
&NodeConstraint::Any,
now,
None,
)?;

2. Lease CPU and Memory

use grafos_std::mem::MemBuilder;
let mem_lease = MemBuilder::new()
.min_bytes(block_size * 4 * 16) // buffer pool for pipeline
.acquire()?;

3. Build the DSP Pipeline

use grafos_dsp::pipeline::{DspPipeline, VecSource, CollectSink};
use grafos_dsp::stages::{FftStage, FirFilterStage, GainStage, ResampleStage};
use grafos_dsp::types::DspConfig;
use grafos_dsp::placement::StagePlacement;
let config = DspConfig {
block_size: 256,
sample_rate: 48_000,
channels: 2,
};
// Low-pass FIR coefficients (example: 31-tap)
let fir_coeffs = design_lowpass_fir(31, 8000.0, 48_000.0);
let pipeline = DspPipeline::new(config)
.source(VecSource::new(input_blocks))
.stage(FftStage::new(), StagePlacement::default())
.stage(FirFilterStage::new(fir_coeffs), StagePlacement::default())
.stage(GainStage::new(0.8), StagePlacement::default())
.stage(ResampleStage::new(44_100), StagePlacement::default())
.sink(CollectSink::new())
.build()?;

4. Run with Tick-Based Renewal

use grafos_leasekit::{RenewalManager, RenewalPolicy};
let mut renewals = RenewalManager::new();
let session_ttl = 300; // 5 minute session
let now = unix_time_secs();
renewals.register(0, now + session_ttl, RenewalPolicy::default());
let mut handle = pipeline;
let stats = handle.run()?;
// In a real streaming scenario, interleave renewal ticks:
// for each block batch {
// let batch_stats = handle.run_batch(batch)?;
// let summary = renewals.tick(unix_time_secs());
// if !summary.near_expiry.is_empty() { checkpoint_or_shed_work(); }
// }

5. Check Results

assert_eq!(stats.overruns, 0, "real-time constraint violated");
println!(
"processed {} blocks, avg latency {}us, max {}us, jitter {}us",
stats.blocks_processed,
stats.avg_latency_us,
stats.max_latency_us,
stats.jitter_us,
);

6. Session End

Drop the pipeline handle. Stop calling renewals.tick(). Leases expire and resources return to the fabric.

Failure Modes

  • Overruns detected: the pipeline ran slower than real-time. Possible causes: insufficient CPU, contention from other workloads, or too many pipeline stages. Reduce pipeline complexity or request more CPU cores.
  • Lease near expiry mid-session: RenewalManager::tick() reports the lease id in summary.near_expiry. Checkpoint or shed work before expiry; continuing after the lease is lost risks writing to reclaimed memory.
  • Admission denied: not enough capacity in the fabric. Wait and retry, or reduce resource requirements.

Observability

  • DspStats.overruns — primary real-time health indicator
  • DspStats.max_latency_us and jitter_us — latency envelope
  • RenewalSummary from tick() — lease health per renewal cycle
  • Admission accept/deny rate — capacity planning signal

Variations

  • GPU-accelerated FFT via FftStage::with_gpu() (requires gpu feature and GPU lease)
  • Multi-pipeline mixing with MixerStage for live audio applications
  • IIR filters via IirFilterStage::new(b_coeffs, a_coeffs) for biquad EQ
  • Stereo processing: set DspConfig.channels = 2, stages handle interleaved channels automatically