1use alloc::string::String;
9use core::fmt;
10
11#[cfg(feature = "std")]
16static GLOBAL_SINK: std::sync::OnceLock<Box<dyn EventSink + Send + Sync>> =
17 std::sync::OnceLock::new();
18
19#[cfg(feature = "std")]
25pub fn set_global_sink(sink: Box<dyn EventSink + Send + Sync>) -> bool {
26 GLOBAL_SINK.set(sink).is_ok()
27}
28
29#[cfg(feature = "std")]
33pub fn emit_event(event: FabricEvent) {
34 if let Some(sink) = GLOBAL_SINK.get() {
35 sink.emit(&event);
36 }
37}
38
39#[cfg(not(feature = "std"))]
41pub fn emit_event(_event: FabricEvent) {}
42
43#[cfg(not(feature = "std"))]
45pub fn set_global_sink(_sink: ()) -> bool {
46 false
47}
48
49#[derive(Debug, Clone)]
58pub enum FabricEvent {
59 LeaseAcquired {
61 resource_type: ResourceType,
63 lease_id: u64,
65 node: String,
67 bytes: u64,
69 trace_id: Option<String>,
71 },
72 LeaseDropped {
74 resource_type: ResourceType,
76 lease_id: u64,
78 node: String,
80 },
81 LeaseExpired {
83 resource_type: ResourceType,
85 lease_id: u64,
87 node: String,
89 },
90 OpCompleted {
92 op_type: OpType,
94 duration_us: u64,
96 bytes: u64,
98 },
99 OpFailed {
101 op_type: OpType,
103 error: String,
105 },
106 RewriteStarted {
108 plan_id: u64,
110 },
111 RewriteCompleted {
113 plan_id: u64,
115 phase: RewritePhase,
117 },
118 ServiceRegistered {
120 name: String,
122 version: String,
124 },
125 ServiceDeregistered {
127 name: String,
129 },
130 MessagePublished {
132 topic: String,
134 bytes: u64,
136 },
137 MessageConsumed {
139 topic: String,
141 group: String,
143 },
144 ObjectStored {
146 bucket: String,
148 key: String,
150 bytes: u64,
152 },
153 ObjectRetrieved {
155 bucket: String,
157 key: String,
159 bytes: u64,
161 },
162 LeaseRevoked {
164 resource_type: ResourceType,
166 lease_id: u64,
168 node: String,
170 trace_id: Option<String>,
172 },
173 LeaseFenced {
175 resource_type: ResourceType,
177 lease_id: u64,
179 node: String,
181 reason: String,
183 trace_id: Option<String>,
185 },
186 TeardownFailed {
188 resource_type: ResourceType,
190 lease_id: u64,
192 node: String,
194 error: String,
196 },
197 AuthFailed {
199 node: String,
201 reason: String,
203 trace_id: Option<String>,
205 },
206 ReplayRejected {
208 node: String,
210 nonce: u64,
212 trace_id: Option<String>,
214 },
215 TokenValidationFailed {
217 node: String,
219 reason: String,
221 },
222 ListenerAcquired {
224 port: u16,
226 node: String,
228 lease_id: u64,
230 },
231 ListenerRevoked {
233 port: u16,
235 node: String,
237 lease_id: u64,
239 },
240 ListenerFenced {
242 port: u16,
244 node: String,
246 lease_id: u64,
248 reason: String,
250 },
251 SessionAccepted {
253 listener_port: u16,
255 session_id: u64,
257 node: String,
259 },
260 SessionClosed {
262 listener_port: u16,
264 session_id: u64,
266 node: String,
268 },
269 SessionDrained {
271 listener_port: u16,
273 sessions_drained: u32,
275 node: String,
277 },
278 ServiceDeployed {
280 name: String,
282 instance_count: u32,
284 },
285 ServiceInstanceStateChanged {
287 name: String,
289 instance_id: u64,
291 state: String,
293 },
294 ServiceCutoverStarted {
296 name: String,
298 },
299 ServiceCutoverCompleted {
301 name: String,
303 },
304 ServiceFailoverTriggered {
306 name: String,
308 reason: String,
310 },
311 ServiceFailoverCompleted {
313 name: String,
315 },
316 ServiceIngressFenced {
318 name: String,
320 instance_id: u64,
322 },
323 ServiceUndeployed {
325 name: String,
327 },
328 TaskletSubmitted {
330 tasklet_id: u64,
332 node: String,
334 runtime_type: String,
336 trace_id: Option<String>,
338 },
339 TaskletCompleted {
341 tasklet_id: u64,
343 status: u8,
345 duration_us: u64,
347 output_bytes: u64,
349 runtime_type: String,
351 trace_id: Option<String>,
353 },
354 TaskletFailed {
356 tasklet_id: u64,
358 status: u8,
360 duration_us: u64,
362 reason: String,
364 runtime_type: String,
366 trace_id: Option<String>,
368 },
369 SecurityProfileActive {
371 mode: String,
373 },
374 AdmissionApproved {
376 tenant_id: String,
378 node: String,
380 resource_type: String,
382 bytes: u64,
384 trace_id: Option<String>,
386 },
387 AdmissionDenied {
389 tenant_id: String,
391 resource_type: String,
393 reason: String,
395 quota_violation: Option<grafos_core::QuotaViolation>,
398 trace_id: Option<String>,
400 },
401 PlacementDecision {
403 tenant_id: String,
405 node: String,
407 strategy: String,
409 score: f64,
411 trace_id: Option<String>,
413 },
414 PreemptionTriggered {
416 victim_lease_id: u64,
418 victim_tenant: String,
420 preemptor_tenant: String,
422 node: String,
424 reason: grafos_core::PreemptionReason,
429 trace_id: Option<String>,
431 },
432 CrossStateDisagreementResolved {
452 kind: &'static str,
455 protocol: &'static str,
458 lease_id: u64,
460 authority: String,
463 trace_id: Option<String>,
465 },
466 QuotaExceeded {
468 tenant_id: String,
470 resource_type: String,
472 requested: u64,
474 limit: u64,
476 trace_id: Option<String>,
478 },
479 TokenMinted {
481 tenant_id: String,
483 node: String,
485 ttl_secs: u32,
487 trace_id: Option<String>,
489 },
490 ProcessStarted {
492 pid: String,
494 scenario: String,
496 nodes: String,
498 },
499 ProcessHeartbeat {
501 pid: String,
503 },
504 ProcessCompleted {
506 pid: String,
508 exit_code: i32,
510 duration_secs: u64,
512 },
513 SchedulerElectionLost {
515 reason: String,
517 epoch: u64,
519 },
520 SchedulerPromotionFailed {
522 reason: String,
524 epoch: u64,
526 },
527 SchedulerStaleLeaderDetected {
529 local_epoch: u64,
531 node_epoch: u64,
533 },
534 SchedulerPromoted {
536 epoch: u64,
538 nodes_fenced: usize,
540 latency_ms: u64,
542 },
543}
544
545#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
549pub enum ResourceType {
550 Mem,
552 Block,
554 Gpu,
556 GpuMem,
558 Cpu,
560 Net,
562}
563
564impl fmt::Display for ResourceType {
565 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
566 match self {
567 ResourceType::Mem => write!(f, "mem"),
568 ResourceType::Block => write!(f, "block"),
569 ResourceType::Gpu => write!(f, "gpu"),
570 ResourceType::GpuMem => write!(f, "gpu_mem"),
571 ResourceType::Cpu => write!(f, "cpu"),
572 ResourceType::Net => write!(f, "net"),
573 }
574 }
575}
576
577impl From<grafos_core::ResourceKind> for ResourceType {
595 fn from(k: grafos_core::ResourceKind) -> Self {
596 match k {
597 grafos_core::ResourceKind::Mem => ResourceType::Mem,
598 grafos_core::ResourceKind::Block => ResourceType::Block,
599 grafos_core::ResourceKind::Net => ResourceType::Net,
600 grafos_core::ResourceKind::Cpu => ResourceType::Cpu,
601 grafos_core::ResourceKind::Gpu => ResourceType::Gpu,
602 grafos_core::ResourceKind::GpuMem => ResourceType::GpuMem,
603 grafos_core::ResourceKind::Tasklet => ResourceType::Cpu,
610 }
611 }
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
619pub enum OpType {
620 Read,
622 Write,
624 ReadBlock,
626 WriteBlock,
628 GpuSubmit,
630 TaskletSubmit,
632}
633
634impl fmt::Display for OpType {
635 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
636 match self {
637 OpType::Read => write!(f, "read"),
638 OpType::Write => write!(f, "write"),
639 OpType::ReadBlock => write!(f, "read_block"),
640 OpType::WriteBlock => write!(f, "write_block"),
641 OpType::GpuSubmit => write!(f, "gpu_submit"),
642 OpType::TaskletSubmit => write!(f, "tasklet_submit"),
643 }
644 }
645}
646
647#[derive(Debug, Clone, Copy, PartialEq, Eq)]
653pub enum RewritePhase {
654 Validate,
656 Stage,
658 Canary,
660 Cutover,
662 Cleanup,
664 Commit,
666 Rollback,
668}
669
670impl fmt::Display for RewritePhase {
671 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
672 match self {
673 RewritePhase::Validate => write!(f, "validate"),
674 RewritePhase::Stage => write!(f, "stage"),
675 RewritePhase::Canary => write!(f, "canary"),
676 RewritePhase::Cutover => write!(f, "cutover"),
677 RewritePhase::Cleanup => write!(f, "cleanup"),
678 RewritePhase::Commit => write!(f, "commit"),
679 RewritePhase::Rollback => write!(f, "rollback"),
680 }
681 }
682}
683
684impl From<grafos_core::RewritePhase> for RewritePhase {
696 fn from(p: grafos_core::RewritePhase) -> Self {
697 match p {
698 grafos_core::RewritePhase::Validate => RewritePhase::Validate,
699 grafos_core::RewritePhase::Stage => RewritePhase::Stage,
700 grafos_core::RewritePhase::Canary => RewritePhase::Canary,
701 grafos_core::RewritePhase::Cutover => RewritePhase::Cutover,
702 grafos_core::RewritePhase::Cleanup => RewritePhase::Cleanup,
703 grafos_core::RewritePhase::Commit => RewritePhase::Commit,
704 grafos_core::RewritePhase::Rollback => RewritePhase::Rollback,
705 }
706 }
707}
708
709impl From<RewritePhase> for grafos_core::RewritePhase {
713 fn from(p: RewritePhase) -> Self {
714 match p {
715 RewritePhase::Validate => grafos_core::RewritePhase::Validate,
716 RewritePhase::Stage => grafos_core::RewritePhase::Stage,
717 RewritePhase::Canary => grafos_core::RewritePhase::Canary,
718 RewritePhase::Cutover => grafos_core::RewritePhase::Cutover,
719 RewritePhase::Cleanup => grafos_core::RewritePhase::Cleanup,
720 RewritePhase::Commit => grafos_core::RewritePhase::Commit,
721 RewritePhase::Rollback => grafos_core::RewritePhase::Rollback,
722 }
723 }
724}
725
726impl fmt::Display for FabricEvent {
727 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728 match self {
729 FabricEvent::LeaseAcquired {
730 resource_type,
731 lease_id,
732 node,
733 bytes,
734 trace_id,
735 } => {
736 write!(
737 f,
738 "lease_acquired: type={resource_type} id={lease_id} node={node} bytes={bytes}"
739 )?;
740 if let Some(tid) = trace_id {
741 write!(f, " trace_id={tid}")?;
742 }
743 Ok(())
744 }
745 FabricEvent::LeaseDropped {
746 resource_type,
747 lease_id,
748 node,
749 } => write!(
750 f,
751 "lease_dropped: type={resource_type} id={lease_id} node={node}"
752 ),
753 FabricEvent::LeaseExpired {
754 resource_type,
755 lease_id,
756 node,
757 } => write!(
758 f,
759 "lease_expired: type={resource_type} id={lease_id} node={node}"
760 ),
761 FabricEvent::OpCompleted {
762 op_type,
763 duration_us,
764 bytes,
765 } => write!(
766 f,
767 "op_completed: op={op_type} duration_us={duration_us} bytes={bytes}"
768 ),
769 FabricEvent::OpFailed { op_type, error } => {
770 write!(f, "op_failed: op={op_type} error={error}")
771 }
772 FabricEvent::RewriteStarted { plan_id } => {
773 write!(f, "rewrite_started: plan_id={plan_id}")
774 }
775 FabricEvent::RewriteCompleted { plan_id, phase } => {
776 write!(f, "rewrite_completed: plan_id={plan_id} phase={phase}")
777 }
778 FabricEvent::ServiceRegistered { name, version } => {
779 write!(f, "service_registered: name={name} version={version}")
780 }
781 FabricEvent::ServiceDeregistered { name } => {
782 write!(f, "service_deregistered: name={name}")
783 }
784 FabricEvent::MessagePublished { topic, bytes } => {
785 write!(f, "message_published: topic={topic} bytes={bytes}")
786 }
787 FabricEvent::MessageConsumed { topic, group } => {
788 write!(f, "message_consumed: topic={topic} group={group}")
789 }
790 FabricEvent::ObjectStored { bucket, key, bytes } => {
791 write!(f, "object_stored: bucket={bucket} key={key} bytes={bytes}")
792 }
793 FabricEvent::ObjectRetrieved { bucket, key, bytes } => {
794 write!(
795 f,
796 "object_retrieved: bucket={bucket} key={key} bytes={bytes}"
797 )
798 }
799 FabricEvent::LeaseRevoked {
800 resource_type,
801 lease_id,
802 node,
803 trace_id,
804 } => {
805 write!(
806 f,
807 "lease_revoked: type={resource_type} id={lease_id} node={node}"
808 )?;
809 if let Some(tid) = trace_id {
810 write!(f, " trace_id={tid}")?;
811 }
812 Ok(())
813 }
814 FabricEvent::LeaseFenced {
815 resource_type,
816 lease_id,
817 node,
818 reason,
819 trace_id,
820 } => {
821 write!(
822 f,
823 "lease_fenced: type={resource_type} id={lease_id} node={node} reason={reason}"
824 )?;
825 if let Some(tid) = trace_id {
826 write!(f, " trace_id={tid}")?;
827 }
828 Ok(())
829 }
830 FabricEvent::TeardownFailed {
831 resource_type,
832 lease_id,
833 node,
834 error,
835 } => write!(
836 f,
837 "teardown_failed: type={resource_type} id={lease_id} node={node} error={error}"
838 ),
839 FabricEvent::AuthFailed {
840 node,
841 reason,
842 trace_id,
843 } => {
844 write!(f, "auth_failed: node={node} reason={reason}")?;
845 if let Some(tid) = trace_id {
846 write!(f, " trace_id={tid}")?;
847 }
848 Ok(())
849 }
850 FabricEvent::ReplayRejected {
851 node,
852 nonce,
853 trace_id,
854 } => {
855 write!(f, "replay_rejected: node={node} nonce={nonce}")?;
856 if let Some(tid) = trace_id {
857 write!(f, " trace_id={tid}")?;
858 }
859 Ok(())
860 }
861 FabricEvent::TokenValidationFailed { node, reason } => {
862 write!(f, "token_validation_failed: node={node} reason={reason}")
863 }
864 FabricEvent::ListenerAcquired {
865 port,
866 node,
867 lease_id,
868 } => write!(
869 f,
870 "listener_acquired: port={port} node={node} lease_id={lease_id}"
871 ),
872 FabricEvent::ListenerRevoked {
873 port,
874 node,
875 lease_id,
876 } => write!(
877 f,
878 "listener_revoked: port={port} node={node} lease_id={lease_id}"
879 ),
880 FabricEvent::ListenerFenced {
881 port,
882 node,
883 lease_id,
884 reason,
885 } => write!(
886 f,
887 "listener_fenced: port={port} node={node} lease_id={lease_id} reason={reason}"
888 ),
889 FabricEvent::SessionAccepted {
890 listener_port,
891 session_id,
892 node,
893 } => write!(
894 f,
895 "session_accepted: listener_port={listener_port} session_id={session_id} node={node}"
896 ),
897 FabricEvent::SessionClosed {
898 listener_port,
899 session_id,
900 node,
901 } => write!(
902 f,
903 "session_closed: listener_port={listener_port} session_id={session_id} node={node}"
904 ),
905 FabricEvent::SessionDrained {
906 listener_port,
907 sessions_drained,
908 node,
909 } => write!(
910 f,
911 "session_drained: listener_port={listener_port} sessions_drained={sessions_drained} node={node}"
912 ),
913 FabricEvent::ServiceDeployed {
914 name,
915 instance_count,
916 } => write!(
917 f,
918 "service_deployed: name={name} instance_count={instance_count}"
919 ),
920 FabricEvent::ServiceInstanceStateChanged {
921 name,
922 instance_id,
923 state,
924 } => write!(
925 f,
926 "service_instance_state_changed: name={name} instance_id={instance_id} state={state}"
927 ),
928 FabricEvent::ServiceCutoverStarted { name } => {
929 write!(f, "service_cutover_started: name={name}")
930 }
931 FabricEvent::ServiceCutoverCompleted { name } => {
932 write!(f, "service_cutover_completed: name={name}")
933 }
934 FabricEvent::ServiceFailoverTriggered { name, reason } => {
935 write!(
936 f,
937 "service_failover_triggered: name={name} reason={reason}"
938 )
939 }
940 FabricEvent::ServiceFailoverCompleted { name } => {
941 write!(f, "service_failover_completed: name={name}")
942 }
943 FabricEvent::ServiceIngressFenced { name, instance_id } => {
944 write!(
945 f,
946 "service_ingress_fenced: name={name} instance_id={instance_id}"
947 )
948 }
949 FabricEvent::ServiceUndeployed { name } => {
950 write!(f, "service_undeployed: name={name}")
951 }
952 FabricEvent::TaskletSubmitted {
953 tasklet_id,
954 node,
955 runtime_type,
956 trace_id,
957 } => {
958 write!(
959 f,
960 "tasklet_submitted: tasklet_id={tasklet_id} node={node} runtime_type={runtime_type}"
961 )?;
962 if let Some(tid) = trace_id {
963 write!(f, " trace_id={tid}")?;
964 }
965 Ok(())
966 }
967 FabricEvent::TaskletCompleted {
968 tasklet_id,
969 status,
970 duration_us,
971 output_bytes,
972 runtime_type,
973 trace_id,
974 } => {
975 write!(
976 f,
977 "tasklet_completed: tasklet_id={tasklet_id} status={status} duration_us={duration_us} output_bytes={output_bytes} runtime_type={runtime_type}"
978 )?;
979 if let Some(tid) = trace_id {
980 write!(f, " trace_id={tid}")?;
981 }
982 Ok(())
983 }
984 FabricEvent::TaskletFailed {
985 tasklet_id,
986 status,
987 duration_us,
988 reason,
989 runtime_type,
990 trace_id,
991 } => {
992 write!(
993 f,
994 "tasklet_failed: tasklet_id={tasklet_id} status={status} duration_us={duration_us} reason={reason} runtime_type={runtime_type}"
995 )?;
996 if let Some(tid) = trace_id {
997 write!(f, " trace_id={tid}")?;
998 }
999 Ok(())
1000 }
1001 FabricEvent::SecurityProfileActive { mode } => {
1002 write!(f, "security_profile_active: mode={mode}")
1003 }
1004 FabricEvent::AdmissionApproved {
1005 tenant_id,
1006 node,
1007 resource_type,
1008 bytes,
1009 trace_id,
1010 } => {
1011 write!(
1012 f,
1013 "admission_approved: tenant={tenant_id} node={node} type={resource_type} bytes={bytes}"
1014 )?;
1015 if let Some(tid) = trace_id {
1016 write!(f, " trace_id={tid}")?;
1017 }
1018 Ok(())
1019 }
1020 FabricEvent::AdmissionDenied {
1021 tenant_id,
1022 resource_type,
1023 reason,
1024 quota_violation,
1025 trace_id,
1026 } => {
1027 write!(
1028 f,
1029 "admission_denied: tenant={tenant_id} type={resource_type} reason={reason}"
1030 )?;
1031 if let Some(violation) = quota_violation {
1032 write!(f, " quota_violation={}", violation.as_str())?;
1033 }
1034 if let Some(tid) = trace_id {
1035 write!(f, " trace_id={tid}")?;
1036 }
1037 Ok(())
1038 }
1039 FabricEvent::PlacementDecision {
1040 tenant_id,
1041 node,
1042 strategy,
1043 score,
1044 trace_id,
1045 } => {
1046 write!(
1047 f,
1048 "placement_decision: tenant={tenant_id} node={node} strategy={strategy} score={score}"
1049 )?;
1050 if let Some(tid) = trace_id {
1051 write!(f, " trace_id={tid}")?;
1052 }
1053 Ok(())
1054 }
1055 FabricEvent::PreemptionTriggered {
1056 victim_lease_id,
1057 victim_tenant,
1058 preemptor_tenant,
1059 node,
1060 reason,
1061 trace_id,
1062 } => {
1063 write!(
1064 f,
1065 "preemption_triggered: victim_lease_id={victim_lease_id} victim_tenant={victim_tenant} preemptor_tenant={preemptor_tenant} node={node} reason={reason}"
1066 )?;
1067 if let Some(tid) = trace_id {
1068 write!(f, " trace_id={tid}")?;
1069 }
1070 Ok(())
1071 }
1072 FabricEvent::CrossStateDisagreementResolved {
1073 kind,
1074 protocol,
1075 lease_id,
1076 authority,
1077 trace_id,
1078 } => {
1079 write!(
1080 f,
1081 "cross_state_disagreement_resolved: kind={kind} protocol={protocol} lease_id={lease_id} authority={authority}"
1082 )?;
1083 if let Some(tid) = trace_id {
1084 write!(f, " trace_id={tid}")?;
1085 }
1086 Ok(())
1087 }
1088 FabricEvent::QuotaExceeded {
1089 tenant_id,
1090 resource_type,
1091 requested,
1092 limit,
1093 trace_id,
1094 } => {
1095 write!(
1096 f,
1097 "quota_exceeded: tenant={tenant_id} type={resource_type} requested={requested} limit={limit}"
1098 )?;
1099 if let Some(tid) = trace_id {
1100 write!(f, " trace_id={tid}")?;
1101 }
1102 Ok(())
1103 }
1104 FabricEvent::TokenMinted {
1105 tenant_id,
1106 node,
1107 ttl_secs,
1108 trace_id,
1109 } => {
1110 write!(
1111 f,
1112 "token_minted: tenant={tenant_id} node={node} ttl_secs={ttl_secs}"
1113 )?;
1114 if let Some(tid) = trace_id {
1115 write!(f, " trace_id={tid}")?;
1116 }
1117 Ok(())
1118 }
1119 FabricEvent::ProcessStarted {
1120 pid,
1121 scenario,
1122 nodes,
1123 } => {
1124 write!(
1125 f,
1126 "process_started: pid={pid} scenario={scenario} nodes={nodes}"
1127 )
1128 }
1129 FabricEvent::ProcessHeartbeat { pid } => {
1130 write!(f, "process_heartbeat: pid={pid}")
1131 }
1132 FabricEvent::ProcessCompleted {
1133 pid,
1134 exit_code,
1135 duration_secs,
1136 } => {
1137 write!(
1138 f,
1139 "process_completed: pid={pid} exit_code={exit_code} duration_secs={duration_secs}"
1140 )
1141 }
1142 FabricEvent::SchedulerElectionLost { reason, epoch } => {
1143 write!(
1144 f,
1145 "scheduler_election_lost: reason={reason} epoch={epoch}"
1146 )
1147 }
1148 FabricEvent::SchedulerPromotionFailed { reason, epoch } => {
1149 write!(
1150 f,
1151 "scheduler_promotion_failed: reason={reason} epoch={epoch}"
1152 )
1153 }
1154 FabricEvent::SchedulerStaleLeaderDetected {
1155 local_epoch,
1156 node_epoch,
1157 } => {
1158 write!(
1159 f,
1160 "scheduler_stale_leader_detected: local_epoch={local_epoch} node_epoch={node_epoch}"
1161 )
1162 }
1163 FabricEvent::SchedulerPromoted {
1164 epoch,
1165 nodes_fenced,
1166 latency_ms,
1167 } => {
1168 write!(
1169 f,
1170 "scheduler_promoted: epoch={epoch} nodes_fenced={nodes_fenced} latency_ms={latency_ms}"
1171 )
1172 }
1173 }
1174 }
1175}
1176
1177pub trait EventSink {
1190 fn emit(&self, event: &FabricEvent);
1192}
1193
1194pub struct NullSink;
1196
1197impl EventSink for NullSink {
1198 fn emit(&self, _event: &FabricEvent) {}
1199}
1200
1201#[cfg(feature = "std")]
1203pub struct StdoutSink;
1204
1205#[cfg(feature = "std")]
1206impl EventSink for StdoutSink {
1207 fn emit(&self, event: &FabricEvent) {
1208 println!("[fabric] {event}");
1209 }
1210}
1211
1212pub struct EventRingBuffer {
1242 buf: alloc::vec::Vec<Option<FabricEvent>>,
1243 head: usize,
1245 len: usize,
1247}
1248
1249impl EventRingBuffer {
1250 pub const DEFAULT_CAPACITY: usize = 1024;
1252
1253 pub fn new(capacity: usize) -> Self {
1255 let cap = if capacity == 0 { 1 } else { capacity };
1256 let mut buf = alloc::vec::Vec::with_capacity(cap);
1257 buf.resize_with(cap, || None);
1258 Self {
1259 buf,
1260 head: 0,
1261 len: 0,
1262 }
1263 }
1264
1265 pub fn with_default_capacity() -> Self {
1267 Self::new(Self::DEFAULT_CAPACITY)
1268 }
1269
1270 pub fn push(&mut self, event: FabricEvent) {
1274 self.buf[self.head] = Some(event);
1275 self.head = (self.head + 1) % self.buf.len();
1276 if self.len < self.buf.len() {
1277 self.len += 1;
1278 }
1279 }
1280
1281 pub fn len(&self) -> usize {
1283 self.len
1284 }
1285
1286 pub fn is_empty(&self) -> bool {
1288 self.len == 0
1289 }
1290
1291 pub fn capacity(&self) -> usize {
1293 self.buf.len()
1294 }
1295
1296 pub fn iter(&self) -> EventRingIter<'_> {
1298 let start = if self.len < self.buf.len() {
1299 0
1300 } else {
1301 self.head
1302 };
1303 EventRingIter {
1304 buf: &self.buf,
1305 pos: start,
1306 remaining: self.len,
1307 }
1308 }
1309
1310 pub fn drain(&mut self) -> alloc::vec::Vec<FabricEvent> {
1314 let mut events = alloc::vec::Vec::with_capacity(self.len);
1315 let start = if self.len < self.buf.len() {
1316 0
1317 } else {
1318 self.head
1319 };
1320 for i in 0..self.len {
1321 let idx = (start + i) % self.buf.len();
1322 if let Some(ev) = self.buf[idx].take() {
1323 events.push(ev);
1324 }
1325 }
1326 self.head = 0;
1327 self.len = 0;
1328 events
1329 }
1330}
1331
1332impl EventSink for EventRingBuffer {
1333 fn emit(&self, event: &FabricEvent) {
1334 let _ = event;
1339 }
1340}
1341
1342pub struct EventRingIter<'a> {
1344 buf: &'a [Option<FabricEvent>],
1345 pos: usize,
1346 remaining: usize,
1347}
1348
1349impl<'a> Iterator for EventRingIter<'a> {
1350 type Item = &'a FabricEvent;
1351
1352 fn next(&mut self) -> Option<Self::Item> {
1353 if self.remaining == 0 {
1354 return None;
1355 }
1356 let idx = self.pos % self.buf.len();
1357 self.pos += 1;
1358 self.remaining -= 1;
1359 self.buf[idx].as_ref()
1360 }
1361
1362 fn size_hint(&self) -> (usize, Option<usize>) {
1363 (self.remaining, Some(self.remaining))
1364 }
1365}
1366
1367impl<'a> ExactSizeIterator for EventRingIter<'a> {}
1368
1369#[cfg(test)]
1370mod tests {
1371 use super::*;
1372 use alloc::string::ToString;
1373
1374 fn sample_lease_acquired() -> FabricEvent {
1375 FabricEvent::LeaseAcquired {
1376 resource_type: ResourceType::Mem,
1377 lease_id: 1,
1378 node: "10.10.0.11".to_string(),
1379 bytes: 4096,
1380 trace_id: None,
1381 }
1382 }
1383
1384 fn sample_op_completed() -> FabricEvent {
1385 FabricEvent::OpCompleted {
1386 op_type: OpType::Write,
1387 duration_us: 500,
1388 bytes: 1024,
1389 }
1390 }
1391
1392 #[test]
1393 fn ring_buffer_push_and_len() {
1394 let mut rb = EventRingBuffer::new(4);
1395 assert!(rb.is_empty());
1396 assert_eq!(rb.len(), 0);
1397 assert_eq!(rb.capacity(), 4);
1398
1399 rb.push(sample_lease_acquired());
1400 assert_eq!(rb.len(), 1);
1401 assert!(!rb.is_empty());
1402 }
1403
1404 #[test]
1405 fn ring_buffer_overflow_wraps() {
1406 let mut rb = EventRingBuffer::new(2);
1407 rb.push(FabricEvent::RewriteStarted { plan_id: 1 });
1408 rb.push(FabricEvent::RewriteStarted { plan_id: 2 });
1409 rb.push(FabricEvent::RewriteStarted { plan_id: 3 });
1410
1411 assert_eq!(rb.len(), 2);
1413
1414 let events: alloc::vec::Vec<_> = rb.iter().collect();
1415 assert_eq!(events.len(), 2);
1416 match &events[0] {
1418 FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 2),
1419 _ => panic!("unexpected event type"),
1420 }
1421 match &events[1] {
1422 FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 3),
1423 _ => panic!("unexpected event type"),
1424 }
1425 }
1426
1427 #[test]
1428 fn ring_buffer_drain() {
1429 let mut rb = EventRingBuffer::new(4);
1430 rb.push(sample_lease_acquired());
1431 rb.push(sample_op_completed());
1432 assert_eq!(rb.len(), 2);
1433
1434 let drained = rb.drain();
1435 assert_eq!(drained.len(), 2);
1436 assert!(rb.is_empty());
1437 assert_eq!(rb.len(), 0);
1438 }
1439
1440 #[test]
1441 fn ring_buffer_drain_after_overflow() {
1442 let mut rb = EventRingBuffer::new(2);
1443 rb.push(FabricEvent::RewriteStarted { plan_id: 1 });
1444 rb.push(FabricEvent::RewriteStarted { plan_id: 2 });
1445 rb.push(FabricEvent::RewriteStarted { plan_id: 3 });
1446
1447 let drained = rb.drain();
1448 assert_eq!(drained.len(), 2);
1449 match &drained[0] {
1450 FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 2),
1451 _ => panic!("unexpected event type"),
1452 }
1453 match &drained[1] {
1454 FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 3),
1455 _ => panic!("unexpected event type"),
1456 }
1457 }
1458
1459 #[test]
1460 fn ring_buffer_iter_exact_size() {
1461 let mut rb = EventRingBuffer::new(4);
1462 rb.push(sample_lease_acquired());
1463 rb.push(sample_op_completed());
1464 let iter = rb.iter();
1465 assert_eq!(iter.len(), 2);
1466 }
1467
1468 #[test]
1469 fn ring_buffer_default_capacity() {
1470 let rb = EventRingBuffer::with_default_capacity();
1471 assert_eq!(rb.capacity(), 1024);
1472 assert!(rb.is_empty());
1473 }
1474
1475 #[test]
1476 fn ring_buffer_zero_capacity_becomes_one() {
1477 let rb = EventRingBuffer::new(0);
1478 assert_eq!(rb.capacity(), 1);
1479 }
1480
1481 #[test]
1482 fn null_sink_does_not_panic() {
1483 let sink = NullSink;
1484 sink.emit(&sample_lease_acquired());
1485 sink.emit(&sample_op_completed());
1486 }
1487
1488 #[test]
1489 fn event_display() {
1490 let ev = sample_lease_acquired();
1491 let s = alloc::format!("{ev}");
1492 assert!(s.contains("lease_acquired"));
1493 assert!(s.contains("10.10.0.11"));
1494 assert!(s.contains("4096"));
1495
1496 let ev2 = FabricEvent::OpFailed {
1497 op_type: OpType::Read,
1498 error: "timeout".to_string(),
1499 };
1500 let s2 = alloc::format!("{ev2}");
1501 assert!(s2.contains("op_failed"));
1502 assert!(s2.contains("timeout"));
1503 }
1504
1505 #[test]
1506 fn resource_type_display() {
1507 assert_eq!(alloc::format!("{}", ResourceType::Mem), "mem");
1508 assert_eq!(alloc::format!("{}", ResourceType::Block), "block");
1509 assert_eq!(alloc::format!("{}", ResourceType::Gpu), "gpu");
1510 assert_eq!(alloc::format!("{}", ResourceType::Cpu), "cpu");
1511 }
1512
1513 #[test]
1514 fn op_type_display() {
1515 assert_eq!(alloc::format!("{}", OpType::Read), "read");
1516 assert_eq!(alloc::format!("{}", OpType::Write), "write");
1517 assert_eq!(alloc::format!("{}", OpType::ReadBlock), "read_block");
1518 assert_eq!(alloc::format!("{}", OpType::WriteBlock), "write_block");
1519 assert_eq!(alloc::format!("{}", OpType::GpuSubmit), "gpu_submit");
1520 assert_eq!(
1521 alloc::format!("{}", OpType::TaskletSubmit),
1522 "tasklet_submit"
1523 );
1524 }
1525
1526 #[test]
1527 fn rewrite_phase_display() {
1528 assert_eq!(alloc::format!("{}", RewritePhase::Validate), "validate");
1529 assert_eq!(alloc::format!("{}", RewritePhase::Stage), "stage");
1530 assert_eq!(alloc::format!("{}", RewritePhase::Canary), "canary");
1531 assert_eq!(alloc::format!("{}", RewritePhase::Cutover), "cutover");
1532 assert_eq!(alloc::format!("{}", RewritePhase::Cleanup), "cleanup");
1533 assert_eq!(alloc::format!("{}", RewritePhase::Commit), "commit");
1534 assert_eq!(alloc::format!("{}", RewritePhase::Rollback), "rollback");
1535 }
1536
1537 #[test]
1538 fn rewrite_phase_bridge_from_core_is_exhaustive_and_lossless() {
1539 let pairs: &[(grafos_core::RewritePhase, RewritePhase)] = &[
1544 (grafos_core::RewritePhase::Validate, RewritePhase::Validate),
1545 (grafos_core::RewritePhase::Stage, RewritePhase::Stage),
1546 (grafos_core::RewritePhase::Canary, RewritePhase::Canary),
1547 (grafos_core::RewritePhase::Cutover, RewritePhase::Cutover),
1548 (grafos_core::RewritePhase::Cleanup, RewritePhase::Cleanup),
1549 (grafos_core::RewritePhase::Commit, RewritePhase::Commit),
1550 (grafos_core::RewritePhase::Rollback, RewritePhase::Rollback),
1551 ];
1552 for (core_phase, observe_phase) in pairs {
1553 let bridged: RewritePhase = (*core_phase).into();
1554 assert_eq!(bridged, *observe_phase);
1555 let back: grafos_core::RewritePhase = (*observe_phase).into();
1557 assert_eq!(back, *core_phase);
1558 }
1559 }
1560
1561 #[test]
1562 fn resource_kind_to_type_bridge_pins_canonical_mapping() {
1563 let cases: &[(grafos_core::ResourceKind, ResourceType)] = &[
1570 (grafos_core::ResourceKind::Mem, ResourceType::Mem),
1571 (grafos_core::ResourceKind::Block, ResourceType::Block),
1572 (grafos_core::ResourceKind::Net, ResourceType::Net),
1573 (grafos_core::ResourceKind::Cpu, ResourceType::Cpu),
1574 (grafos_core::ResourceKind::Gpu, ResourceType::Gpu),
1575 (grafos_core::ResourceKind::GpuMem, ResourceType::GpuMem),
1576 (grafos_core::ResourceKind::Tasklet, ResourceType::Cpu),
1583 ];
1584 for (kind, expected) in cases {
1585 let bridged: ResourceType = (*kind).into();
1586 assert_eq!(bridged, *expected, "bridge for {kind:?}");
1587 }
1588 }
1589
1590 #[test]
1591 fn resource_kind_to_type_bridge_preserves_display_for_non_tasklet() {
1592 for &kind in &[
1598 grafos_core::ResourceKind::Mem,
1599 grafos_core::ResourceKind::Block,
1600 grafos_core::ResourceKind::Net,
1601 grafos_core::ResourceKind::Cpu,
1602 grafos_core::ResourceKind::Gpu,
1603 grafos_core::ResourceKind::GpuMem,
1604 ] {
1605 let bridged: ResourceType = kind.into();
1606 assert_eq!(
1607 kind.as_str(),
1608 alloc::format!("{}", bridged),
1609 "1:1 variant {kind:?} must preserve string across bridge"
1610 );
1611 }
1612 let tasklet: ResourceType = grafos_core::ResourceKind::Tasklet.into();
1614 assert_eq!(grafos_core::ResourceKind::Tasklet.as_str(), "tasklet");
1615 assert_eq!(alloc::format!("{}", tasklet), "cpu");
1616 }
1617
1618 #[test]
1619 fn rewrite_phase_bridge_preserves_display_string() {
1620 for &core_phase in &[
1625 grafos_core::RewritePhase::Validate,
1626 grafos_core::RewritePhase::Stage,
1627 grafos_core::RewritePhase::Canary,
1628 grafos_core::RewritePhase::Cutover,
1629 grafos_core::RewritePhase::Cleanup,
1630 grafos_core::RewritePhase::Commit,
1631 grafos_core::RewritePhase::Rollback,
1632 ] {
1633 let observe_phase: RewritePhase = core_phase.into();
1634 assert_eq!(
1635 core_phase.as_str(),
1636 alloc::format!("{}", observe_phase),
1637 "bridge must preserve the SIEM string across layers"
1638 );
1639 }
1640 }
1641
1642 #[test]
1643 fn new_event_variants_display() {
1644 let ev = FabricEvent::LeaseRevoked {
1645 resource_type: ResourceType::Mem,
1646 lease_id: 10,
1647 node: "node-a".to_string(),
1648 trace_id: None,
1649 };
1650 let s = alloc::format!("{ev}");
1651 assert!(s.contains("lease_revoked"));
1652 assert!(s.contains("node-a"));
1653
1654 let ev = FabricEvent::LeaseFenced {
1655 resource_type: ResourceType::Block,
1656 lease_id: 11,
1657 node: "node-b".to_string(),
1658 reason: "teardown timeout".to_string(),
1659 trace_id: None,
1660 };
1661 let s = alloc::format!("{ev}");
1662 assert!(s.contains("lease_fenced"));
1663 assert!(s.contains("teardown timeout"));
1664
1665 let ev = FabricEvent::TeardownFailed {
1666 resource_type: ResourceType::Gpu,
1667 lease_id: 12,
1668 node: "node-c".to_string(),
1669 error: "connection lost".to_string(),
1670 };
1671 let s = alloc::format!("{ev}");
1672 assert!(s.contains("teardown_failed"));
1673 assert!(s.contains("connection lost"));
1674
1675 let ev = FabricEvent::AuthFailed {
1676 node: "node-d".to_string(),
1677 reason: "bad cert".to_string(),
1678 trace_id: None,
1679 };
1680 let s = alloc::format!("{ev}");
1681 assert!(s.contains("auth_failed"));
1682 assert!(s.contains("bad cert"));
1683
1684 let ev = FabricEvent::ReplayRejected {
1685 node: "node-e".to_string(),
1686 nonce: 12345,
1687 trace_id: None,
1688 };
1689 let s = alloc::format!("{ev}");
1690 assert!(s.contains("replay_rejected"));
1691 assert!(s.contains("12345"));
1692
1693 let ev = FabricEvent::TokenValidationFailed {
1694 node: "node-f".to_string(),
1695 reason: "expired".to_string(),
1696 };
1697 let s = alloc::format!("{ev}");
1698 assert!(s.contains("token_validation_failed"));
1699 assert!(s.contains("expired"));
1700 }
1701
1702 #[cfg(feature = "std")]
1703 #[test]
1704 fn stdout_sink_does_not_panic() {
1705 let sink = StdoutSink;
1706 sink.emit(&sample_lease_acquired());
1707 }
1708
1709 #[test]
1710 fn tasklet_event_variants_display() {
1711 let ev = FabricEvent::TaskletSubmitted {
1712 tasklet_id: 42,
1713 node: "node-a".to_string(),
1714 runtime_type: "linux-native-tasklet".to_string(),
1715 trace_id: None,
1716 };
1717 let s = alloc::format!("{ev}");
1718 assert!(s.contains("tasklet_submitted"));
1719 assert!(s.contains("42"));
1720
1721 let ev = FabricEvent::TaskletCompleted {
1722 tasklet_id: 42,
1723 status: 0,
1724 duration_us: 1500,
1725 output_bytes: 256,
1726 runtime_type: "linux-native-tasklet".to_string(),
1727 trace_id: None,
1728 };
1729 let s = alloc::format!("{ev}");
1730 assert!(s.contains("tasklet_completed"));
1731 assert!(s.contains("1500"));
1732
1733 let ev = FabricEvent::TaskletFailed {
1734 tasklet_id: 42,
1735 status: 3,
1736 duration_us: 500,
1737 reason: "fuel_exhausted".to_string(),
1738 runtime_type: "linux-native-tasklet".to_string(),
1739 trace_id: None,
1740 };
1741 let s = alloc::format!("{ev}");
1742 assert!(s.contains("tasklet_failed"));
1743 assert!(s.contains("fuel_exhausted"));
1744 }
1745
1746 #[cfg(feature = "std")]
1747 #[test]
1748 fn emit_event_without_sink_is_noop() {
1749 emit_event(sample_lease_acquired());
1751 }
1752}