Recipe 22: Deterministic Latency Audio Over Borrowed Hardware
Situation
Audio processing requires deterministic latency. Buffer underruns produce audible glitches. Traditional audio systems pre-allocate fixed resources on a single machine. You want to borrow CPU and memory from the fabric for the exact duration of the audio session, with hard guarantees that resources will not be reclaimed mid-stream.
The key insight: grafos-dsp pipelines track DspStats.overruns (blocks where processing exceeded the block
duration). If overruns are zero, the pipeline met real-time constraints. Admission control and lease renewal
close the loop: check before you start, and keep renewing while you run.
What You Build
A DSP pipeline (FFT, FIR filter, gain, resample) where:
grafos_scheduler::AdmissionControllerpre-checks CPU and memory availability- Leases are acquired for CPU and memory resources
grafos_leasekit::RenewalManagerdrives lease renewal in the DSP tick loopDspStats.overrunsis the measurable indicator of resource starvation
Building Blocks
grafos_dsp::pipeline::{DspPipeline, DspPipelineHandle, DspSource, DspSink, VecSource, CollectSink}— pipeline builder and execution — sourcegrafos_dsp::stages::{FftStage, FirFilterStage, GainStage, ResampleStage}— built-in processing stages — sourcegrafos_dsp::types::{DspConfig, DspStats, Block, Sample}— configuration and statistics — sourcegrafos_dsp::placement::StagePlacement— stage placement hints — sourcegrafos_scheduler::AdmissionController— capacity pre-check — sourcegrafos_leasekit::{RenewalManager, RenewalPolicy}— lease renewal — source
Design
Pipeline Topology
Source -> FFT -> FIR Filter -> Gain -> Resample -> SinkEach stage implements DspStage::process(&mut self, block: &Block) -> Result<Block>. The pipeline processes
blocks sequentially through all stages. DspConfig sets the block size, sample rate, and channel count.
Admission Pre-Check
Before acquiring leases, ask AdmissionController whether the fabric has enough capacity. This avoids
half-provisioned sessions where you get memory but not CPU.
Lease Renewal in the Tick Loop
The DSP pipeline runs block-by-block. Between blocks (or every N blocks), call RenewalManager::tick(now).
This keeps leases alive for the session duration without a separate renewal thread.
Overrun Detection
DspPipelineHandle::run() compares per-block processing time against the block duration
(block_size / sample_rate). If processing exceeds the block duration, stats.overruns increments.
Zero overruns = real-time guarantees met.
Walkthrough (Implementation Sketch)
Core grafOS API Path
Use current scheduler admission APIs to check capacity, then acquire memory and run the DSP pipeline with renewal tracking:
use grafos_core::ResourceKind;use grafos_scheduler::{ AdmissionController, CapacityLedger, NodeConstraint, ReservationStore,};use grafos_std::mem::MemBuilder;
let block_size = 256u64;let now = unix_time_secs();let ledger = CapacityLedger::new();// Populate ledger from fabric inventory before checking admission.
let controller = AdmissionController::new(ledger, ReservationStore::new());let decision = controller.check( 9001, // holder id ResourceKind::Mem, block_size * 4 * 16, // bytes &NodeConstraint::Any, now, None,)?;
let mem_lease = MemBuilder::new() .min_bytes(block_size * 4 * 16) .lease_secs(300) .acquire()?;# let _ = (decision, mem_lease);# Ok::<(), Box<dyn std::error::Error>>(())1. Admission Check
use grafos_core::ResourceKind;use grafos_scheduler::{ AdmissionController, CapacityLedger, NodeConstraint, ReservationStore,};
let ledger = CapacityLedger::new();// ... populate ledger from fabric inventory ...
let controller = AdmissionController::new(ledger, ReservationStore::new());let decision = controller.check( 9001, ResourceKind::Mem, block_size * 4 * 16, &NodeConstraint::Any, now, None,)?;2. Lease CPU and Memory
use grafos_std::mem::MemBuilder;
let mem_lease = MemBuilder::new() .min_bytes(block_size * 4 * 16) // buffer pool for pipeline .acquire()?;3. Build the DSP Pipeline
use grafos_dsp::pipeline::{DspPipeline, VecSource, CollectSink};use grafos_dsp::stages::{FftStage, FirFilterStage, GainStage, ResampleStage};use grafos_dsp::types::DspConfig;use grafos_dsp::placement::StagePlacement;
let config = DspConfig { block_size: 256, sample_rate: 48_000, channels: 2,};
// Low-pass FIR coefficients (example: 31-tap)let fir_coeffs = design_lowpass_fir(31, 8000.0, 48_000.0);
let pipeline = DspPipeline::new(config) .source(VecSource::new(input_blocks)) .stage(FftStage::new(), StagePlacement::default()) .stage(FirFilterStage::new(fir_coeffs), StagePlacement::default()) .stage(GainStage::new(0.8), StagePlacement::default()) .stage(ResampleStage::new(44_100), StagePlacement::default()) .sink(CollectSink::new()) .build()?;4. Run with Tick-Based Renewal
use grafos_leasekit::{RenewalManager, RenewalPolicy};
let mut renewals = RenewalManager::new();let session_ttl = 300; // 5 minute sessionlet now = unix_time_secs();renewals.register(0, now + session_ttl, RenewalPolicy::default());
let mut handle = pipeline;let stats = handle.run()?;
// In a real streaming scenario, interleave renewal ticks:// for each block batch {// let batch_stats = handle.run_batch(batch)?;// let summary = renewals.tick(unix_time_secs());// if !summary.near_expiry.is_empty() { checkpoint_or_shed_work(); }// }5. Check Results
assert_eq!(stats.overruns, 0, "real-time constraint violated");println!( "processed {} blocks, avg latency {}us, max {}us, jitter {}us", stats.blocks_processed, stats.avg_latency_us, stats.max_latency_us, stats.jitter_us,);6. Session End
Drop the pipeline handle. Stop calling renewals.tick(). Leases expire and resources return to the fabric.
Failure Modes
- Overruns detected: the pipeline ran slower than real-time. Possible causes: insufficient CPU, contention from other workloads, or too many pipeline stages. Reduce pipeline complexity or request more CPU cores.
- Lease near expiry mid-session:
RenewalManager::tick()reports the lease id insummary.near_expiry. Checkpoint or shed work before expiry; continuing after the lease is lost risks writing to reclaimed memory. - Admission denied: not enough capacity in the fabric. Wait and retry, or reduce resource requirements.
Observability
DspStats.overruns— primary real-time health indicatorDspStats.max_latency_usandjitter_us— latency envelopeRenewalSummaryfromtick()— lease health per renewal cycle- Admission accept/deny rate — capacity planning signal
Variations
- GPU-accelerated FFT via
FftStage::with_gpu()(requiresgpufeature and GPU lease) - Multi-pipeline mixing with
MixerStagefor live audio applications - IIR filters via
IirFilterStage::new(b_coeffs, a_coeffs)for biquad EQ - Stereo processing: set
DspConfig.channels = 2, stages handle interleaved channels automatically