1use alloc::string::String;
8use alloc::vec::Vec;
9
10use grafos_observe::event::{OpType, ResourceType};
11use grafos_observe::span::{OpKey, ResourceSpan, SpanStatus};
12use grafos_observe::trace::{SpanId, TraceContext, TraceId};
13use serde::{Deserialize, Serialize};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct SpanJson {
18 pub name: String,
19 pub trace_id: String,
20 pub span_id: String,
21 pub parent_span_id: Option<String>,
22 pub start_time_unix_us: u64,
23 pub end_time_unix_us: u64,
24 pub status: String,
25 pub status_message: Option<String>,
26 pub lease_ids: Vec<String>,
27 pub ops: Vec<OpJson>,
28 pub bytes_read: u64,
29 pub bytes_written: u64,
30 pub lease_cost_byte_secs: u64,
31 pub lease_acquire_wait_us: u64,
32 pub attributes: Vec<(String, String)>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct OpJson {
38 pub resource_type: String,
39 pub op_type: String,
40 pub count: u64,
41}
42
43impl SpanJson {
44 pub fn from_span(span: &ResourceSpan) -> Self {
46 let parent = if let Some(pid) = span.parent_span_id {
47 if pid.is_valid() {
48 Some(alloc::format!("{:016x}", pid.0))
49 } else {
50 None
51 }
52 } else {
53 None
54 };
55
56 let (status, status_message) = match &span.status {
57 SpanStatus::Ok => (String::from("ok"), None),
58 SpanStatus::Error(msg) => (String::from("error"), Some(msg.clone())),
59 };
60
61 SpanJson {
62 name: span.name.clone(),
63 trace_id: alloc::format!("{:032x}", span.trace_context.trace_id.0),
64 span_id: alloc::format!("{:016x}", span.trace_context.span_id.0),
65 parent_span_id: parent,
66 start_time_unix_us: span.start_time_unix_us,
67 end_time_unix_us: span.end_time_unix_us,
68 status,
69 status_message,
70 lease_ids: span
71 .lease_ids
72 .iter()
73 .map(|id| alloc::format!("{:032x}", id))
74 .collect(),
75 ops: span
76 .ops
77 .iter()
78 .map(|(key, count)| OpJson {
79 resource_type: alloc::format!("{}", key.resource_type),
80 op_type: alloc::format!("{}", key.op_type),
81 count: *count,
82 })
83 .collect(),
84 bytes_read: span.bytes_read,
85 bytes_written: span.bytes_written,
86 lease_cost_byte_secs: span.lease_cost_byte_secs,
87 lease_acquire_wait_us: span.lease_acquire_wait_us,
88 attributes: span.attributes.clone(),
89 }
90 }
91
92 pub fn to_span(&self) -> ResourceSpan {
94 let trace_id = u128::from_str_radix(&self.trace_id, 16).unwrap_or(0);
95 let span_id = u64::from_str_radix(&self.span_id, 16).unwrap_or(0);
96
97 let parent_span_id = self
98 .parent_span_id
99 .as_ref()
100 .map(|s| SpanId(u64::from_str_radix(s, 16).unwrap_or(0)));
101
102 let status = match self.status.as_str() {
103 "error" => SpanStatus::Error(
104 self.status_message
105 .clone()
106 .unwrap_or_else(|| String::from("unknown")),
107 ),
108 _ => SpanStatus::Ok,
109 };
110
111 let mut span = ResourceSpan {
112 name: self.name.clone(),
113 trace_context: TraceContext {
114 trace_id: TraceId(trace_id),
115 span_id: SpanId(span_id),
116 parent_span_id: parent_span_id.unwrap_or(SpanId::INVALID),
117 flags: 0x01, },
119 parent_span_id,
120 start_time_unix_us: self.start_time_unix_us,
121 end_time_unix_us: self.end_time_unix_us,
122 status,
123 lease_ids: self
124 .lease_ids
125 .iter()
126 .map(|s| u128::from_str_radix(s, 16).unwrap_or(0))
127 .collect(),
128 ops: Vec::new(),
129 bytes_read: self.bytes_read,
130 bytes_written: self.bytes_written,
131 lease_cost_byte_secs: self.lease_cost_byte_secs,
132 lease_acquire_wait_us: self.lease_acquire_wait_us,
133 attributes: self.attributes.clone(),
134 };
135
136 for op in &self.ops {
137 let resource_type = parse_resource_type(&op.resource_type);
138 let op_type = parse_op_type(&op.op_type);
139 span.ops.push((
140 OpKey {
141 resource_type,
142 op_type,
143 },
144 op.count,
145 ));
146 }
147
148 span
149 }
150}
151
152fn parse_resource_type(s: &str) -> ResourceType {
153 match s {
154 "mem" => ResourceType::Mem,
155 "block" => ResourceType::Block,
156 "gpu" => ResourceType::Gpu,
157 "cpu" => ResourceType::Cpu,
158 _ => ResourceType::Mem, }
160}
161
162fn parse_op_type(s: &str) -> OpType {
163 match s {
164 "read" => OpType::Read,
165 "write" => OpType::Write,
166 "read_block" => OpType::ReadBlock,
167 "write_block" => OpType::WriteBlock,
168 "gpu_submit" => OpType::GpuSubmit,
169 "tasklet_submit" => OpType::TaskletSubmit,
170 _ => OpType::Read, }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 fn test_ctx() -> TraceContext {
179 let mut bytes = [0u8; 24];
180 for (i, b) in bytes.iter_mut().enumerate() {
181 *b = (i as u8).wrapping_add(0x42);
182 }
183 TraceContext::new_root(&bytes)
184 }
185
186 #[test]
187 fn roundtrip_span_json() {
188 let ctx = test_ctx();
189 let child_ctx = ctx.child(&[0xBB; 8]);
190 let mut span = ResourceSpan::new("test_op", child_ctx);
191 span.parent_span_id = Some(ctx.span_id);
192 span.start_time_unix_us = 1_000_000;
193 span.end_time_unix_us = 2_000_000;
194 span.bytes_read = 4096;
195 span.bytes_written = 2048;
196 span.lease_cost_byte_secs = 8192;
197 span.lease_acquire_wait_us = 500;
198 span.add_lease_id(42);
199 span.record_op(ResourceType::Mem, OpType::Read, 10);
200 span.record_op(ResourceType::Block, OpType::WriteBlock, 5);
201 span.set_attribute("env", "test");
202
203 let json = SpanJson::from_span(&span);
204 let recovered = json.to_span();
205
206 assert_eq!(recovered.name, "test_op");
207 assert_eq!(
208 recovered.trace_context.trace_id,
209 span.trace_context.trace_id
210 );
211 assert_eq!(recovered.trace_context.span_id, span.trace_context.span_id);
212 assert_eq!(recovered.parent_span_id.unwrap(), ctx.span_id);
213 assert_eq!(recovered.start_time_unix_us, 1_000_000);
214 assert_eq!(recovered.end_time_unix_us, 2_000_000);
215 assert_eq!(recovered.bytes_read, 4096);
216 assert_eq!(recovered.bytes_written, 2048);
217 assert_eq!(recovered.lease_cost_byte_secs, 8192);
218 assert_eq!(recovered.lease_ids, vec![42u128]);
219 assert_eq!(recovered.op_count(ResourceType::Mem, OpType::Read), 10);
220 assert_eq!(
221 recovered.op_count(ResourceType::Block, OpType::WriteBlock),
222 5
223 );
224 }
225
226 #[test]
227 fn error_status_roundtrip() {
228 let ctx = test_ctx();
229 let mut span = ResourceSpan::new("fail_op", ctx);
230 span.status = SpanStatus::Error(String::from("connection refused"));
231
232 let json = SpanJson::from_span(&span);
233 let recovered = json.to_span();
234
235 match &recovered.status {
236 SpanStatus::Error(msg) => assert_eq!(msg, "connection refused"),
237 _ => panic!("expected error status"),
238 }
239 }
240
241 #[cfg(feature = "std")]
242 #[test]
243 fn serde_json_roundtrip() {
244 let ctx = test_ctx();
245 let mut span = ResourceSpan::new("serde_test", ctx);
246 span.start_time_unix_us = 100;
247 span.end_time_unix_us = 200;
248 span.add_lease_id(7);
249
250 let json_obj = SpanJson::from_span(&span);
251 let serialized = serde_json::to_string(&json_obj).unwrap();
252 let deserialized: SpanJson = serde_json::from_str(&serialized).unwrap();
253 let recovered = deserialized.to_span();
254
255 assert_eq!(recovered.name, "serde_test");
256 assert_eq!(recovered.start_time_unix_us, 100);
257 assert_eq!(recovered.end_time_unix_us, 200);
258 assert_eq!(recovered.lease_ids, vec![7u128]);
259 }
260}