grafos_dsp/
pipeline.rs

1//! DSP pipeline builder and execution engine.
2
3extern crate alloc;
4use alloc::boxed::Box;
5use alloc::vec::Vec;
6
7use grafos_std::error::FabricError;
8
9use crate::placement::StagePlacement;
10use crate::stages::DspStage;
11use crate::types::{Block, DspConfig, DspStats};
12
13/// Source of audio blocks for a DSP pipeline.
14pub trait DspSource {
15    fn next_block(&mut self) -> Result<Option<Block>, FabricError>;
16}
17
18/// Sink for audio blocks at the end of a DSP pipeline.
19pub trait DspSink {
20    fn accept_block(&mut self, block: Block) -> Result<(), FabricError>;
21}
22
23/// A collecting sink that stores all output blocks.
24pub struct CollectSink {
25    blocks: Vec<Block>,
26}
27
28impl CollectSink {
29    pub fn new() -> Self {
30        Self { blocks: Vec::new() }
31    }
32
33    pub fn into_blocks(self) -> Vec<Block> {
34        self.blocks
35    }
36}
37
38impl Default for CollectSink {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl DspSink for CollectSink {
45    fn accept_block(&mut self, block: Block) -> Result<(), FabricError> {
46        self.blocks.push(block);
47        Ok(())
48    }
49}
50
51/// A source that yields blocks from a Vec.
52pub struct VecSource {
53    blocks: Vec<Block>,
54    index: usize,
55}
56
57impl VecSource {
58    pub fn new(blocks: Vec<Block>) -> Self {
59        Self { blocks, index: 0 }
60    }
61}
62
63impl DspSource for VecSource {
64    fn next_block(&mut self) -> Result<Option<Block>, FabricError> {
65        if self.index < self.blocks.len() {
66            let block = self.blocks[self.index].clone();
67            self.index += 1;
68            Ok(Some(block))
69        } else {
70            Ok(None)
71        }
72    }
73}
74
75struct StageEntry {
76    stage: Box<dyn DspStage>,
77    _placement: StagePlacement,
78}
79
80/// Builder for constructing a DSP processing pipeline.
81pub struct DspPipeline {
82    config: DspConfig,
83    source: Option<Box<dyn DspSource>>,
84    stages: Vec<StageEntry>,
85    sink: Option<Box<dyn DspSink>>,
86}
87
88impl DspPipeline {
89    /// Create a new DSP pipeline with the given configuration.
90    pub fn new(config: DspConfig) -> Self {
91        Self {
92            config,
93            source: None,
94            stages: Vec::new(),
95            sink: None,
96        }
97    }
98
99    /// Set the audio input source.
100    pub fn source(mut self, source: impl DspSource + 'static) -> Self {
101        self.source = Some(Box::new(source));
102        self
103    }
104
105    /// Add a processing stage with placement hint.
106    pub fn stage(mut self, stage: impl DspStage + 'static, placement: StagePlacement) -> Self {
107        self.stages.push(StageEntry {
108            stage: Box::new(stage),
109            _placement: placement,
110        });
111        self
112    }
113
114    /// Set the audio output sink.
115    pub fn sink(mut self, sink: impl DspSink + 'static) -> Self {
116        self.sink = Some(Box::new(sink));
117        self
118    }
119
120    /// Build the pipeline into a runnable handle.
121    pub fn build(self) -> Result<DspPipelineHandle, FabricError> {
122        let source = self.source.ok_or(FabricError::CapacityExceeded)?;
123        let sink = self.sink.ok_or(FabricError::CapacityExceeded)?;
124        Ok(DspPipelineHandle {
125            config: self.config,
126            source,
127            stages: self.stages,
128            sink,
129        })
130    }
131}
132
133/// A built DSP pipeline ready for execution.
134pub struct DspPipelineHandle {
135    config: DspConfig,
136    source: Box<dyn DspSource>,
137    stages: Vec<StageEntry>,
138    sink: Box<dyn DspSink>,
139}
140
141impl DspPipelineHandle {
142    /// Run the pipeline to completion, processing all blocks from source to sink.
143    pub fn run(&mut self) -> Result<DspStats, FabricError> {
144        let mut stats = DspStats::default();
145        let mut min_latency_us: u64 = u64::MAX;
146        let mut total_latency_us: u64 = 0;
147
148        // Block duration in microseconds: (block_size / sample_rate) * 1_000_000
149        let block_duration_us = if self.config.sample_rate > 0 {
150            (self.config.block_size as u64 * 1_000_000) / self.config.sample_rate as u64
151        } else {
152            0
153        };
154
155        while let Some(block) = self.source.next_block()? {
156            let start = Self::timestamp_us();
157
158            let mut current = block;
159            for entry in self.stages.iter_mut() {
160                current = entry.stage.process(&current)?;
161            }
162
163            self.sink.accept_block(current)?;
164
165            let end = Self::timestamp_us();
166            let elapsed = end.saturating_sub(start);
167
168            stats.blocks_processed += 1;
169            total_latency_us += elapsed;
170
171            if elapsed > stats.max_latency_us {
172                stats.max_latency_us = elapsed;
173            }
174            if elapsed < min_latency_us {
175                min_latency_us = elapsed;
176            }
177
178            // Overrun detection: processing took longer than the block duration
179            if block_duration_us > 0 && elapsed > block_duration_us {
180                stats.overruns += 1;
181            }
182        }
183
184        if stats.blocks_processed > 0 {
185            stats.avg_latency_us = total_latency_us / stats.blocks_processed;
186            stats.jitter_us = stats.max_latency_us.saturating_sub(min_latency_us);
187        }
188
189        Ok(stats)
190    }
191
192    #[cfg(feature = "std")]
193    fn timestamp_us() -> u64 {
194        // Use std::time for host-side timing
195        use std::time::{SystemTime, UNIX_EPOCH};
196        SystemTime::now()
197            .duration_since(UNIX_EPOCH)
198            .map(|d| d.as_micros() as u64)
199            .unwrap_or(0)
200    }
201
202    #[cfg(not(feature = "std"))]
203    fn timestamp_us() -> u64 {
204        // On bare metal, return 0 — timing comes from hardware counters
205        0
206    }
207}
208
209#[cfg(all(test, feature = "std"))]
210mod tests {
211    use super::*;
212    use crate::stages::DspStage;
213    use crate::types::{Block, Sample};
214    use alloc::vec;
215
216    /// A stage that sleeps for a configurable duration per block,
217    /// used to intentionally trigger overrun detection.
218    struct SlowStage {
219        delay: std::time::Duration,
220    }
221
222    impl DspStage for SlowStage {
223        fn process(&mut self, block: &Block) -> Result<Block, FabricError> {
224            std::thread::sleep(self.delay);
225            Ok(block.clone())
226        }
227    }
228
229    fn make_block(block_size: usize, sample_rate: u32) -> Block {
230        Block {
231            data: vec![0.0 as Sample; block_size],
232            sample_rate,
233            channels: 1,
234        }
235    }
236
237    #[test]
238    fn overrun_detected_for_slow_stage() {
239        // 48kHz, 256 samples per block => block duration = 256/48000 ~= 5333 us
240        let config = DspConfig {
241            block_size: 256,
242            sample_rate: 48_000,
243            channels: 1,
244        };
245
246        let blocks = vec![
247            make_block(256, 48_000),
248            make_block(256, 48_000),
249            make_block(256, 48_000),
250        ];
251
252        // Sleep 20ms per block — well over the ~5.3ms block duration
253        let slow = SlowStage {
254            delay: std::time::Duration::from_millis(20),
255        };
256
257        let mut handle = DspPipeline::new(config)
258            .source(VecSource::new(blocks))
259            .stage(slow, StagePlacement::default())
260            .sink(CollectSink::new())
261            .build()
262            .unwrap();
263
264        let stats = handle.run().unwrap();
265
266        assert_eq!(stats.blocks_processed, 3);
267        assert_eq!(stats.overruns, 3, "all 3 blocks should be overruns");
268    }
269
270    #[test]
271    fn no_overrun_for_fast_stage() {
272        // 48kHz, 256 samples => ~5333 us block duration
273        // GainStage is trivially fast, no overruns expected
274        let config = DspConfig {
275            block_size: 256,
276            sample_rate: 48_000,
277            channels: 1,
278        };
279
280        let blocks = vec![make_block(256, 48_000), make_block(256, 48_000)];
281
282        let gain = crate::stages::GainStage::new(1.0);
283
284        let mut handle = DspPipeline::new(config)
285            .source(VecSource::new(blocks))
286            .stage(gain, StagePlacement::default())
287            .sink(CollectSink::new())
288            .build()
289            .unwrap();
290
291        let stats = handle.run().unwrap();
292
293        assert_eq!(stats.blocks_processed, 2);
294        assert_eq!(stats.overruns, 0, "fast stage should not trigger overruns");
295    }
296}