1use alloc::collections::BTreeMap;
7use alloc::string::String;
8use alloc::vec::Vec;
9
10use grafos_observe::event::{OpType, ResourceType};
11
12use crate::recording::ProfileRecording;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum DataFlowNodeKind {
17 Span,
19 Lease,
21}
22
23#[derive(Debug, Clone)]
25pub struct DataFlowNode {
26 pub id: String,
28 pub label: String,
30 pub kind: DataFlowNodeKind,
32 pub resource_type: Option<ResourceType>,
34 pub size: u64,
36}
37
38#[derive(Debug, Clone)]
40pub struct DataFlowEdge {
41 pub from: String,
43 pub to: String,
45 pub bytes: u64,
47 pub label: String,
49}
50
51#[derive(Debug, Clone)]
53pub struct DataFlowDiagram {
54 pub nodes: Vec<DataFlowNode>,
56 pub edges: Vec<DataFlowEdge>,
58}
59
60impl DataFlowDiagram {
61 pub fn from_recording(rec: &ProfileRecording) -> Self {
67 let mut lease_writers: BTreeMap<u128, Vec<(String, u64)>> = BTreeMap::new();
70 let mut lease_readers: BTreeMap<u128, Vec<(String, u64)>> = BTreeMap::new();
71 let mut lease_resource_type: BTreeMap<u128, ResourceType> = BTreeMap::new();
72
73 let mut span_ops: BTreeMap<String, u64> = BTreeMap::new();
75
76 for span in &rec.spans {
77 let is_writer = span
78 .ops
79 .iter()
80 .any(|(k, _)| matches!(k.op_type, OpType::Write | OpType::WriteBlock));
81 let is_reader = span
82 .ops
83 .iter()
84 .any(|(k, _)| matches!(k.op_type, OpType::Read | OpType::ReadBlock));
85
86 let total_ops: u64 = span.ops.iter().map(|(_, c)| c).sum();
87 *span_ops.entry(span.name.clone()).or_insert(0) += total_ops;
88
89 let rt = span
91 .ops
92 .iter()
93 .max_by_key(|(_, c)| c)
94 .map(|(k, _)| k.resource_type);
95
96 for &lease_id in &span.lease_ids {
97 if let Some(r) = rt {
98 lease_resource_type.entry(lease_id).or_insert(r);
99 }
100
101 if is_writer {
102 lease_writers
103 .entry(lease_id)
104 .or_default()
105 .push((span.name.clone(), span.bytes_written));
106 }
107 if is_reader {
108 lease_readers
109 .entry(lease_id)
110 .or_default()
111 .push((span.name.clone(), span.bytes_read));
112 }
113 }
114 }
115
116 let mut nodes: Vec<DataFlowNode> = Vec::new();
118 let mut node_ids: BTreeMap<String, usize> = BTreeMap::new();
119
120 for (name, &ops) in &span_ops {
122 let id = alloc::format!("span:{}", name);
123 if !node_ids.contains_key(&id) {
124 node_ids.insert(id.clone(), nodes.len());
125 nodes.push(DataFlowNode {
126 id,
127 label: name.clone(),
128 kind: DataFlowNodeKind::Span,
129 resource_type: None,
130 size: ops,
131 });
132 }
133 }
134
135 let all_lease_ids: Vec<u128> = {
137 let mut ids: Vec<u128> = lease_writers
138 .keys()
139 .chain(lease_readers.keys())
140 .copied()
141 .collect();
142 ids.sort();
143 ids.dedup();
144 ids
145 };
146
147 for &lease_id in &all_lease_ids {
148 let id = alloc::format!("lease:{:x}", lease_id);
149 if !node_ids.contains_key(&id) {
150 let rt = lease_resource_type.get(&lease_id).copied();
151 node_ids.insert(id.clone(), nodes.len());
152 nodes.push(DataFlowNode {
153 id,
154 label: alloc::format!("lease {:x}", lease_id),
155 kind: DataFlowNodeKind::Lease,
156 resource_type: rt,
157 size: 0,
158 });
159 }
160 }
161
162 let mut edges: Vec<DataFlowEdge> = Vec::new();
164 let mut edge_map: BTreeMap<(String, String), u64> = BTreeMap::new();
166
167 for (&lease_id, writers) in &lease_writers {
168 let lease_node_id = alloc::format!("lease:{:x}", lease_id);
169 for (span_name, bytes) in writers {
170 let span_node_id = alloc::format!("span:{}", span_name);
171 let key = (span_node_id, lease_node_id.clone());
172 *edge_map.entry(key).or_insert(0) += bytes;
173 }
174 }
175
176 for (&lease_id, readers) in &lease_readers {
177 let lease_node_id = alloc::format!("lease:{:x}", lease_id);
178 for (span_name, bytes) in readers {
179 let span_node_id = alloc::format!("span:{}", span_name);
180 let key = (lease_node_id.clone(), span_node_id);
181 *edge_map.entry(key).or_insert(0) += bytes;
182 }
183 }
184
185 for ((from, to), bytes) in &edge_map {
186 edges.push(DataFlowEdge {
187 from: from.clone(),
188 to: to.clone(),
189 bytes: *bytes,
190 label: format_bytes(*bytes),
191 });
192 }
193
194 DataFlowDiagram { nodes, edges }
195 }
196
197 #[cfg(feature = "html")]
199 pub fn render_html(&self) -> String {
200 let mut html = String::new();
201 html.push_str("<!DOCTYPE html>\n<html>\n<head>\n");
202 html.push_str("<meta charset=\"utf-8\">\n");
203 html.push_str("<title>grafOS Data Flow Diagram</title>\n");
204 html.push_str("<style>\n");
205 html.push_str("body { font-family: monospace; margin: 0; padding: 20px; background: #1a1a2e; color: #eee; }\n");
206 html.push_str("h1 { font-size: 18px; }\n");
207 html.push_str("svg { border: 1px solid #333; background: #0d0d1a; }\n");
208 html.push_str(".node-span rect { stroke: #eee; stroke-width: 1; }\n");
209 html.push_str(".node-lease circle { stroke: #eee; stroke-width: 1; }\n");
210 html.push_str(".edge line { stroke: #666; }\n");
211 html.push_str(".edge-label { fill: #aaa; font-size: 10px; }\n");
212 html.push_str(".node-label { fill: #eee; font-size: 11px; text-anchor: middle; }\n");
213 html.push_str("</style>\n</head>\n<body>\n");
214 html.push_str("<h1>grafOS Data Flow Diagram</h1>\n");
215 html.push_str(&alloc::format!(
216 "<p>{} nodes, {} edges</p>\n",
217 self.nodes.len(),
218 self.edges.len()
219 ));
220
221 let svg_width = 900;
223 let svg_height = 600;
224
225 html.push_str(&alloc::format!(
226 "<svg width=\"{}\" height=\"{}\">\n",
227 svg_width,
228 svg_height
229 ));
230
231 let n = self.nodes.len().max(1);
233 let cols = ((n as f64).sqrt().ceil() as usize).max(1);
234
235 let mut node_positions: BTreeMap<String, (f64, f64)> = BTreeMap::new();
236 for (i, node) in self.nodes.iter().enumerate() {
237 let col = i % cols;
238 let row = i / cols;
239 let x = 80.0 + (col as f64) * 160.0;
240 let y = 80.0 + (row as f64) * 120.0;
241 node_positions.insert(node.id.clone(), (x, y));
242 }
243
244 for edge in &self.edges {
246 if let (Some(&(x1, y1)), Some(&(x2, y2))) =
247 (node_positions.get(&edge.from), node_positions.get(&edge.to))
248 {
249 let mx = (x1 + x2) / 2.0;
250 let my = (y1 + y2) / 2.0;
251 html.push_str(&alloc::format!(
252 "<g class=\"edge\"><line x1=\"{x1:.0}\" y1=\"{y1:.0}\" x2=\"{x2:.0}\" y2=\"{y2:.0}\"/>\
253 <text class=\"edge-label\" x=\"{mx:.0}\" y=\"{my:.0}\">{}</text></g>\n",
254 edge.label
255 ));
256 }
257 }
258
259 for node in &self.nodes {
261 if let Some(&(x, y)) = node_positions.get(&node.id) {
262 match node.kind {
263 DataFlowNodeKind::Span => {
264 html.push_str(&alloc::format!(
265 "<g class=\"node-span\"><rect x=\"{:.0}\" y=\"{:.0}\" width=\"120\" height=\"30\" rx=\"3\" fill=\"#2a2a4a\"/>\
266 <text class=\"node-label\" x=\"{:.0}\" y=\"{:.0}\">{}</text></g>\n",
267 x - 60.0, y - 15.0, x, y + 4.0, node.label
268 ));
269 }
270 DataFlowNodeKind::Lease => {
271 let color = match node.resource_type {
272 Some(ResourceType::Mem) => "#4a90d9",
273 Some(ResourceType::Block) => "#50c878",
274 Some(ResourceType::Gpu) => "#ff8c42",
275 Some(ResourceType::Cpu) => "#9b59b6",
276 Some(ResourceType::Net) => "#e74c3c",
277 None => "#999",
278 };
279 html.push_str(&alloc::format!(
280 "<g class=\"node-lease\"><circle cx=\"{x:.0}\" cy=\"{y:.0}\" r=\"20\" fill=\"{color}\"/>\
281 <text class=\"node-label\" x=\"{x:.0}\" y=\"{:.0}\">{}</text></g>\n",
282 y + 35.0, node.label
283 ));
284 }
285 }
286 }
287 }
288
289 html.push_str("</svg>\n</body>\n</html>\n");
290 html
291 }
292}
293
294fn format_bytes(bytes: u64) -> String {
295 if bytes >= 1_073_741_824 {
296 alloc::format!("{:.1} GB", bytes as f64 / 1_073_741_824.0)
297 } else if bytes >= 1_048_576 {
298 alloc::format!("{:.1} MB", bytes as f64 / 1_048_576.0)
299 } else if bytes >= 1024 {
300 alloc::format!("{:.1} KB", bytes as f64 / 1024.0)
301 } else {
302 alloc::format!("{} B", bytes)
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use grafos_observe::event::{OpType, ResourceType};
310 use grafos_observe::span::ResourceSpan;
311 use grafos_observe::trace::TraceContext;
312
313 fn test_ctx() -> TraceContext {
314 let mut bytes = [0u8; 24];
315 for (i, b) in bytes.iter_mut().enumerate() {
316 *b = (i as u8).wrapping_add(0x42);
317 }
318 TraceContext::new_root(&bytes)
319 }
320
321 #[test]
322 fn writer_to_reader_edge() {
323 let ctx = test_ctx();
324 let c1 = ctx.child(&[0xAA; 8]);
325
326 let mut span_a = ResourceSpan::new("writer_a", ctx);
328 span_a.start_time_unix_us = 1000;
329 span_a.end_time_unix_us = 2000;
330 span_a.add_lease_id(0x42);
331 span_a.bytes_written = 4096;
332 span_a.record_op(ResourceType::Mem, OpType::Write, 1);
333
334 let mut span_b = ResourceSpan::new("reader_b", c1);
336 span_b.start_time_unix_us = 2000;
337 span_b.end_time_unix_us = 3000;
338 span_b.add_lease_id(0x42);
339 span_b.bytes_read = 4096;
340 span_b.record_op(ResourceType::Mem, OpType::Read, 1);
341
342 let rec = ProfileRecording::from_spans(vec![span_a, span_b]);
343 let diagram = DataFlowDiagram::from_recording(&rec);
344
345 assert_eq!(diagram.nodes.len(), 3);
347
348 let span_a_node = diagram
349 .nodes
350 .iter()
351 .find(|n| n.label == "writer_a")
352 .unwrap();
353 assert_eq!(span_a_node.kind, DataFlowNodeKind::Span);
354
355 let span_b_node = diagram
356 .nodes
357 .iter()
358 .find(|n| n.label == "reader_b")
359 .unwrap();
360 assert_eq!(span_b_node.kind, DataFlowNodeKind::Span);
361
362 let lease_node = diagram
363 .nodes
364 .iter()
365 .find(|n| n.kind == DataFlowNodeKind::Lease)
366 .unwrap();
367 assert_eq!(lease_node.resource_type, Some(ResourceType::Mem));
368
369 assert_eq!(diagram.edges.len(), 2);
371
372 let write_edge = diagram
373 .edges
374 .iter()
375 .find(|e| e.from.contains("writer_a"))
376 .unwrap();
377 assert!(write_edge.to.contains("lease:"));
378 assert_eq!(write_edge.bytes, 4096);
379
380 let read_edge = diagram
381 .edges
382 .iter()
383 .find(|e| e.to.contains("reader_b"))
384 .unwrap();
385 assert!(read_edge.from.contains("lease:"));
386 assert_eq!(read_edge.bytes, 4096);
387 }
388
389 #[test]
390 fn collapsed_same_name_spans() {
391 let ctx = test_ctx();
392 let c1 = ctx.child(&[0xAA; 8]);
393
394 let mut s1 = ResourceSpan::new("worker", ctx);
396 s1.add_lease_id(1);
397 s1.bytes_written = 100;
398 s1.record_op(ResourceType::Mem, OpType::Write, 1);
399
400 let mut s2 = ResourceSpan::new("worker", c1);
401 s2.add_lease_id(1);
402 s2.bytes_written = 200;
403 s2.record_op(ResourceType::Mem, OpType::Write, 1);
404
405 let rec = ProfileRecording::from_spans(vec![s1, s2]);
406 let diagram = DataFlowDiagram::from_recording(&rec);
407
408 let span_nodes: Vec<_> = diagram
410 .nodes
411 .iter()
412 .filter(|n| n.kind == DataFlowNodeKind::Span)
413 .collect();
414 assert_eq!(span_nodes.len(), 1);
415 assert_eq!(span_nodes[0].label, "worker");
416
417 let write_edge = diagram
419 .edges
420 .iter()
421 .find(|e| e.from.contains("worker"))
422 .unwrap();
423 assert_eq!(write_edge.bytes, 300);
424 }
425
426 #[cfg(feature = "html")]
427 #[test]
428 fn data_flow_html_structure() {
429 let ctx = test_ctx();
430 let mut span = ResourceSpan::new("test", ctx);
431 span.add_lease_id(1);
432 span.bytes_written = 100;
433 span.record_op(ResourceType::Mem, OpType::Write, 1);
434
435 let rec = ProfileRecording::from_spans(vec![span]);
436 let diagram = DataFlowDiagram::from_recording(&rec);
437 let html = diagram.render_html();
438
439 assert!(html.contains("<!DOCTYPE html>"));
440 assert!(html.contains("Data Flow Diagram"));
441 assert!(html.contains("</html>"));
442 }
443}