grafos_profile/
data_flow.rs

1//! Data-flow diagram — auto-generated from observed read/write patterns across leases.
2//!
3//! Infers a DAG of `(span-name) --[writes N bytes]--> (lease) --[reads M bytes]--> (span-name)`
4//! from the recording's per-span lease IDs and operation types.
5
6use alloc::collections::BTreeMap;
7use alloc::string::String;
8use alloc::vec::Vec;
9
10use grafos_observe::event::{OpType, ResourceType};
11
12use crate::recording::ProfileRecording;
13
14/// Kind of node in the data-flow graph.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum DataFlowNodeKind {
17    /// A span (operation) node.
18    Span,
19    /// A lease (resource) node.
20    Lease,
21}
22
23/// A node in the data-flow diagram.
24#[derive(Debug, Clone)]
25pub struct DataFlowNode {
26    /// Unique identifier for this node.
27    pub id: String,
28    /// Display label.
29    pub label: String,
30    /// Kind of node (span or lease).
31    pub kind: DataFlowNodeKind,
32    /// Resource type (for lease nodes).
33    pub resource_type: Option<ResourceType>,
34    /// Size metric (bytes for leases, ops for spans).
35    pub size: u64,
36}
37
38/// A directed edge in the data-flow diagram.
39#[derive(Debug, Clone)]
40pub struct DataFlowEdge {
41    /// Source node ID.
42    pub from: String,
43    /// Target node ID.
44    pub to: String,
45    /// Bytes transferred along this edge.
46    pub bytes: u64,
47    /// Edge label (e.g. "writes 4096 bytes").
48    pub label: String,
49}
50
51/// Data-flow diagram inferred from a recording.
52#[derive(Debug, Clone)]
53pub struct DataFlowDiagram {
54    /// All nodes in the graph.
55    pub nodes: Vec<DataFlowNode>,
56    /// All edges in the graph.
57    pub edges: Vec<DataFlowEdge>,
58}
59
60impl DataFlowDiagram {
61    /// Build a data-flow diagram from a profile recording.
62    ///
63    /// For each lease, finds all spans that wrote to it and all spans that read
64    /// from it. Creates directed edges: writer-span -> lease -> reader-span.
65    /// Spans with the same name are collapsed into single nodes.
66    pub fn from_recording(rec: &ProfileRecording) -> Self {
67        // Track per-lease writers and readers
68        // Key: lease_id, Value: { writers: [(span_name, bytes)], readers: [(span_name, bytes)] }
69        let mut lease_writers: BTreeMap<u128, Vec<(String, u64)>> = BTreeMap::new();
70        let mut lease_readers: BTreeMap<u128, Vec<(String, u64)>> = BTreeMap::new();
71        let mut lease_resource_type: BTreeMap<u128, ResourceType> = BTreeMap::new();
72
73        // Track span total ops for sizing
74        let mut span_ops: BTreeMap<String, u64> = BTreeMap::new();
75
76        for span in &rec.spans {
77            let is_writer = span
78                .ops
79                .iter()
80                .any(|(k, _)| matches!(k.op_type, OpType::Write | OpType::WriteBlock));
81            let is_reader = span
82                .ops
83                .iter()
84                .any(|(k, _)| matches!(k.op_type, OpType::Read | OpType::ReadBlock));
85
86            let total_ops: u64 = span.ops.iter().map(|(_, c)| c).sum();
87            *span_ops.entry(span.name.clone()).or_insert(0) += total_ops;
88
89            // Infer resource type from ops
90            let rt = span
91                .ops
92                .iter()
93                .max_by_key(|(_, c)| c)
94                .map(|(k, _)| k.resource_type);
95
96            for &lease_id in &span.lease_ids {
97                if let Some(r) = rt {
98                    lease_resource_type.entry(lease_id).or_insert(r);
99                }
100
101                if is_writer {
102                    lease_writers
103                        .entry(lease_id)
104                        .or_default()
105                        .push((span.name.clone(), span.bytes_written));
106                }
107                if is_reader {
108                    lease_readers
109                        .entry(lease_id)
110                        .or_default()
111                        .push((span.name.clone(), span.bytes_read));
112                }
113            }
114        }
115
116        // Build nodes
117        let mut nodes: Vec<DataFlowNode> = Vec::new();
118        let mut node_ids: BTreeMap<String, usize> = BTreeMap::new();
119
120        // Add span nodes (collapsed by name)
121        for (name, &ops) in &span_ops {
122            let id = alloc::format!("span:{}", name);
123            if !node_ids.contains_key(&id) {
124                node_ids.insert(id.clone(), nodes.len());
125                nodes.push(DataFlowNode {
126                    id,
127                    label: name.clone(),
128                    kind: DataFlowNodeKind::Span,
129                    resource_type: None,
130                    size: ops,
131                });
132            }
133        }
134
135        // Add lease nodes
136        let all_lease_ids: Vec<u128> = {
137            let mut ids: Vec<u128> = lease_writers
138                .keys()
139                .chain(lease_readers.keys())
140                .copied()
141                .collect();
142            ids.sort();
143            ids.dedup();
144            ids
145        };
146
147        for &lease_id in &all_lease_ids {
148            let id = alloc::format!("lease:{:x}", lease_id);
149            if !node_ids.contains_key(&id) {
150                let rt = lease_resource_type.get(&lease_id).copied();
151                node_ids.insert(id.clone(), nodes.len());
152                nodes.push(DataFlowNode {
153                    id,
154                    label: alloc::format!("lease {:x}", lease_id),
155                    kind: DataFlowNodeKind::Lease,
156                    resource_type: rt,
157                    size: 0,
158                });
159            }
160        }
161
162        // Build edges
163        let mut edges: Vec<DataFlowEdge> = Vec::new();
164        // Track aggregated edges: (from, to) -> total_bytes
165        let mut edge_map: BTreeMap<(String, String), u64> = BTreeMap::new();
166
167        for (&lease_id, writers) in &lease_writers {
168            let lease_node_id = alloc::format!("lease:{:x}", lease_id);
169            for (span_name, bytes) in writers {
170                let span_node_id = alloc::format!("span:{}", span_name);
171                let key = (span_node_id, lease_node_id.clone());
172                *edge_map.entry(key).or_insert(0) += bytes;
173            }
174        }
175
176        for (&lease_id, readers) in &lease_readers {
177            let lease_node_id = alloc::format!("lease:{:x}", lease_id);
178            for (span_name, bytes) in readers {
179                let span_node_id = alloc::format!("span:{}", span_name);
180                let key = (lease_node_id.clone(), span_node_id);
181                *edge_map.entry(key).or_insert(0) += bytes;
182            }
183        }
184
185        for ((from, to), bytes) in &edge_map {
186            edges.push(DataFlowEdge {
187                from: from.clone(),
188                to: to.clone(),
189                bytes: *bytes,
190                label: format_bytes(*bytes),
191            });
192        }
193
194        DataFlowDiagram { nodes, edges }
195    }
196
197    /// Render as self-contained HTML with force-directed layout.
198    #[cfg(feature = "html")]
199    pub fn render_html(&self) -> String {
200        let mut html = String::new();
201        html.push_str("<!DOCTYPE html>\n<html>\n<head>\n");
202        html.push_str("<meta charset=\"utf-8\">\n");
203        html.push_str("<title>grafOS Data Flow Diagram</title>\n");
204        html.push_str("<style>\n");
205        html.push_str("body { font-family: monospace; margin: 0; padding: 20px; background: #1a1a2e; color: #eee; }\n");
206        html.push_str("h1 { font-size: 18px; }\n");
207        html.push_str("svg { border: 1px solid #333; background: #0d0d1a; }\n");
208        html.push_str(".node-span rect { stroke: #eee; stroke-width: 1; }\n");
209        html.push_str(".node-lease circle { stroke: #eee; stroke-width: 1; }\n");
210        html.push_str(".edge line { stroke: #666; }\n");
211        html.push_str(".edge-label { fill: #aaa; font-size: 10px; }\n");
212        html.push_str(".node-label { fill: #eee; font-size: 11px; text-anchor: middle; }\n");
213        html.push_str("</style>\n</head>\n<body>\n");
214        html.push_str("<h1>grafOS Data Flow Diagram</h1>\n");
215        html.push_str(&alloc::format!(
216            "<p>{} nodes, {} edges</p>\n",
217            self.nodes.len(),
218            self.edges.len()
219        ));
220
221        // Simple grid layout (force-directed would need JS library; keep it minimal)
222        let svg_width = 900;
223        let svg_height = 600;
224
225        html.push_str(&alloc::format!(
226            "<svg width=\"{}\" height=\"{}\">\n",
227            svg_width,
228            svg_height
229        ));
230
231        // Position nodes in a simple grid
232        let n = self.nodes.len().max(1);
233        let cols = ((n as f64).sqrt().ceil() as usize).max(1);
234
235        let mut node_positions: BTreeMap<String, (f64, f64)> = BTreeMap::new();
236        for (i, node) in self.nodes.iter().enumerate() {
237            let col = i % cols;
238            let row = i / cols;
239            let x = 80.0 + (col as f64) * 160.0;
240            let y = 80.0 + (row as f64) * 120.0;
241            node_positions.insert(node.id.clone(), (x, y));
242        }
243
244        // Draw edges
245        for edge in &self.edges {
246            if let (Some(&(x1, y1)), Some(&(x2, y2))) =
247                (node_positions.get(&edge.from), node_positions.get(&edge.to))
248            {
249                let mx = (x1 + x2) / 2.0;
250                let my = (y1 + y2) / 2.0;
251                html.push_str(&alloc::format!(
252                    "<g class=\"edge\"><line x1=\"{x1:.0}\" y1=\"{y1:.0}\" x2=\"{x2:.0}\" y2=\"{y2:.0}\"/>\
253                     <text class=\"edge-label\" x=\"{mx:.0}\" y=\"{my:.0}\">{}</text></g>\n",
254                    edge.label
255                ));
256            }
257        }
258
259        // Draw nodes
260        for node in &self.nodes {
261            if let Some(&(x, y)) = node_positions.get(&node.id) {
262                match node.kind {
263                    DataFlowNodeKind::Span => {
264                        html.push_str(&alloc::format!(
265                            "<g class=\"node-span\"><rect x=\"{:.0}\" y=\"{:.0}\" width=\"120\" height=\"30\" rx=\"3\" fill=\"#2a2a4a\"/>\
266                             <text class=\"node-label\" x=\"{:.0}\" y=\"{:.0}\">{}</text></g>\n",
267                            x - 60.0, y - 15.0, x, y + 4.0, node.label
268                        ));
269                    }
270                    DataFlowNodeKind::Lease => {
271                        let color = match node.resource_type {
272                            Some(ResourceType::Mem) => "#4a90d9",
273                            Some(ResourceType::Block) => "#50c878",
274                            Some(ResourceType::Gpu) => "#ff8c42",
275                            Some(ResourceType::GpuMem) => "#d97706",
276                            Some(ResourceType::Cpu) => "#9b59b6",
277                            Some(ResourceType::Net) => "#e74c3c",
278                            None => "#999",
279                        };
280                        html.push_str(&alloc::format!(
281                            "<g class=\"node-lease\"><circle cx=\"{x:.0}\" cy=\"{y:.0}\" r=\"20\" fill=\"{color}\"/>\
282                             <text class=\"node-label\" x=\"{x:.0}\" y=\"{:.0}\">{}</text></g>\n",
283                            y + 35.0, node.label
284                        ));
285                    }
286                }
287            }
288        }
289
290        html.push_str("</svg>\n</body>\n</html>\n");
291        html
292    }
293}
294
295fn format_bytes(bytes: u64) -> String {
296    if bytes >= 1_073_741_824 {
297        alloc::format!("{:.1} GB", bytes as f64 / 1_073_741_824.0)
298    } else if bytes >= 1_048_576 {
299        alloc::format!("{:.1} MB", bytes as f64 / 1_048_576.0)
300    } else if bytes >= 1024 {
301        alloc::format!("{:.1} KB", bytes as f64 / 1024.0)
302    } else {
303        alloc::format!("{} B", bytes)
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use grafos_observe::event::{OpType, ResourceType};
311    use grafos_observe::span::ResourceSpan;
312    use grafos_observe::trace::TraceContext;
313
314    fn test_ctx() -> TraceContext {
315        let mut bytes = [0u8; 24];
316        for (i, b) in bytes.iter_mut().enumerate() {
317            *b = (i as u8).wrapping_add(0x42);
318        }
319        TraceContext::new_root(&bytes)
320    }
321
322    #[test]
323    fn writer_to_reader_edge() {
324        let ctx = test_ctx();
325        let c1 = ctx.child(&[0xAA; 8]);
326
327        // Span A writes to lease X
328        let mut span_a = ResourceSpan::new("writer_a", ctx);
329        span_a.start_time_unix_us = 1000;
330        span_a.end_time_unix_us = 2000;
331        span_a.add_lease_id(0x42);
332        span_a.bytes_written = 4096;
333        span_a.record_op(ResourceType::Mem, OpType::Write, 1);
334
335        // Span B reads from lease X
336        let mut span_b = ResourceSpan::new("reader_b", c1);
337        span_b.start_time_unix_us = 2000;
338        span_b.end_time_unix_us = 3000;
339        span_b.add_lease_id(0x42);
340        span_b.bytes_read = 4096;
341        span_b.record_op(ResourceType::Mem, OpType::Read, 1);
342
343        let rec = ProfileRecording::from_spans(vec![span_a, span_b]);
344        let diagram = DataFlowDiagram::from_recording(&rec);
345
346        // Should have 3 nodes: writer_a (span), lease 42, reader_b (span)
347        assert_eq!(diagram.nodes.len(), 3);
348
349        let span_a_node = diagram
350            .nodes
351            .iter()
352            .find(|n| n.label == "writer_a")
353            .unwrap();
354        assert_eq!(span_a_node.kind, DataFlowNodeKind::Span);
355
356        let span_b_node = diagram
357            .nodes
358            .iter()
359            .find(|n| n.label == "reader_b")
360            .unwrap();
361        assert_eq!(span_b_node.kind, DataFlowNodeKind::Span);
362
363        let lease_node = diagram
364            .nodes
365            .iter()
366            .find(|n| n.kind == DataFlowNodeKind::Lease)
367            .unwrap();
368        assert_eq!(lease_node.resource_type, Some(ResourceType::Mem));
369
370        // Should have 2 edges: writer_a -> lease, lease -> reader_b
371        assert_eq!(diagram.edges.len(), 2);
372
373        let write_edge = diagram
374            .edges
375            .iter()
376            .find(|e| e.from.contains("writer_a"))
377            .unwrap();
378        assert!(write_edge.to.contains("lease:"));
379        assert_eq!(write_edge.bytes, 4096);
380
381        let read_edge = diagram
382            .edges
383            .iter()
384            .find(|e| e.to.contains("reader_b"))
385            .unwrap();
386        assert!(read_edge.from.contains("lease:"));
387        assert_eq!(read_edge.bytes, 4096);
388    }
389
390    #[test]
391    fn collapsed_same_name_spans() {
392        let ctx = test_ctx();
393        let c1 = ctx.child(&[0xAA; 8]);
394
395        // Two spans with the same name
396        let mut s1 = ResourceSpan::new("worker", ctx);
397        s1.add_lease_id(1);
398        s1.bytes_written = 100;
399        s1.record_op(ResourceType::Mem, OpType::Write, 1);
400
401        let mut s2 = ResourceSpan::new("worker", c1);
402        s2.add_lease_id(1);
403        s2.bytes_written = 200;
404        s2.record_op(ResourceType::Mem, OpType::Write, 1);
405
406        let rec = ProfileRecording::from_spans(vec![s1, s2]);
407        let diagram = DataFlowDiagram::from_recording(&rec);
408
409        // Only one "worker" span node (collapsed)
410        let span_nodes: Vec<_> = diagram
411            .nodes
412            .iter()
413            .filter(|n| n.kind == DataFlowNodeKind::Span)
414            .collect();
415        assert_eq!(span_nodes.len(), 1);
416        assert_eq!(span_nodes[0].label, "worker");
417
418        // Edge should have aggregated bytes: 100 + 200 = 300
419        let write_edge = diagram
420            .edges
421            .iter()
422            .find(|e| e.from.contains("worker"))
423            .unwrap();
424        assert_eq!(write_edge.bytes, 300);
425    }
426
427    #[cfg(feature = "html")]
428    #[test]
429    fn data_flow_html_structure() {
430        let ctx = test_ctx();
431        let mut span = ResourceSpan::new("test", ctx);
432        span.add_lease_id(1);
433        span.bytes_written = 100;
434        span.record_op(ResourceType::Mem, OpType::Write, 1);
435
436        let rec = ProfileRecording::from_spans(vec![span]);
437        let diagram = DataFlowDiagram::from_recording(&rec);
438        let html = diagram.render_html();
439
440        assert!(html.contains("<!DOCTYPE html>"));
441        assert!(html.contains("Data Flow Diagram"));
442        assert!(html.contains("</html>"));
443    }
444}