grafos_stream/
sink.rs

1//! Built-in sink implementations.
2//!
3//! | Type | Description |
4//! |------|-------------|
5//! | [`CollectSink<T>`] | Collects all items into a `Vec<T>` |
6//! | [`CountSink<T>`] | Counts items received |
7//! | [`FnSink<T, F>`] | Wraps a closure as a sink |
8
9extern crate alloc;
10use alloc::vec::Vec;
11
12use grafos_std::error::FabricError;
13
14use crate::stage::Sink;
15
16/// Sink that collects all items into a `Vec<T>`.
17///
18/// After the pipeline completes, call [`into_vec()`](CollectSink::into_vec)
19/// to retrieve the collected items.
20///
21/// # Example
22///
23/// ```rust
24/// use grafos_stream::sink::CollectSink;
25/// use grafos_stream::stage::Sink;
26///
27/// let mut sink = CollectSink::new();
28/// sink.accept(1).unwrap();
29/// sink.accept(2).unwrap();
30/// assert_eq!(sink.into_vec(), vec![1, 2]);
31/// ```
32pub struct CollectSink<T> {
33    items: Vec<T>,
34}
35
36impl<T> CollectSink<T> {
37    /// Create an empty collecting sink.
38    pub fn new() -> Self {
39        CollectSink { items: Vec::new() }
40    }
41
42    /// Consume the sink and return all accepted items in arrival order.
43    pub fn into_vec(self) -> Vec<T> {
44        self.items
45    }
46}
47
48impl<T> Default for CollectSink<T> {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl<T> Sink<T> for CollectSink<T> {
55    fn accept(&mut self, item: T) -> Result<(), FabricError> {
56        self.items.push(item);
57        Ok(())
58    }
59}
60
61/// Sink that counts items received.
62///
63/// # Example
64///
65/// ```rust
66/// use grafos_stream::sink::CountSink;
67/// use grafos_stream::stage::Sink;
68///
69/// let mut sink: CountSink<u32> = CountSink::new();
70/// sink.accept(10).unwrap();
71/// sink.accept(20).unwrap();
72/// assert_eq!(sink.count(), 2);
73/// ```
74pub struct CountSink<T> {
75    count: u64,
76    _marker: core::marker::PhantomData<T>,
77}
78
79impl<T> CountSink<T> {
80    /// Create a count sink initialized to zero.
81    pub fn new() -> Self {
82        CountSink {
83            count: 0,
84            _marker: core::marker::PhantomData,
85        }
86    }
87
88    /// Number of items accepted so far.
89    pub fn count(&self) -> u64 {
90        self.count
91    }
92}
93
94impl<T> Default for CountSink<T> {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100impl<T> Sink<T> for CountSink<T> {
101    fn accept(&mut self, _item: T) -> Result<(), FabricError> {
102        self.count += 1;
103        Ok(())
104    }
105}
106
107/// Sink that wraps a closure.
108///
109/// # Example
110///
111/// ```rust
112/// use grafos_stream::sink::FnSink;
113/// use grafos_stream::stage::Sink;
114///
115/// let mut total = 0u64;
116/// {
117///     let mut sink = FnSink::new(|x: u32| { total += x as u64; });
118///     sink.accept(10).unwrap();
119///     sink.accept(20).unwrap();
120/// }
121/// assert_eq!(total, 30);
122/// ```
123pub struct FnSink<T, F: FnMut(T)> {
124    f: F,
125    _marker: core::marker::PhantomData<T>,
126}
127
128impl<T, F: FnMut(T)> FnSink<T, F> {
129    /// Wrap a closure as a sink implementation.
130    pub fn new(f: F) -> Self {
131        FnSink {
132            f,
133            _marker: core::marker::PhantomData,
134        }
135    }
136}
137
138impl<T, F: FnMut(T)> Sink<T> for FnSink<T, F> {
139    fn accept(&mut self, item: T) -> Result<(), FabricError> {
140        (self.f)(item);
141        Ok(())
142    }
143}