1use alloc::string::String;
9use core::fmt;
10
11#[cfg(feature = "std")]
16static GLOBAL_SINK: std::sync::OnceLock<Box<dyn EventSink + Send + Sync>> =
17 std::sync::OnceLock::new();
18
19#[cfg(feature = "std")]
25pub fn set_global_sink(sink: Box<dyn EventSink + Send + Sync>) -> bool {
26 GLOBAL_SINK.set(sink).is_ok()
27}
28
29#[cfg(feature = "std")]
33pub fn emit_event(event: FabricEvent) {
34 if let Some(sink) = GLOBAL_SINK.get() {
35 sink.emit(&event);
36 }
37}
38
39#[cfg(not(feature = "std"))]
41pub fn emit_event(_event: FabricEvent) {}
42
43#[cfg(not(feature = "std"))]
45pub fn set_global_sink(_sink: ()) -> bool {
46 false
47}
48
49#[derive(Debug, Clone)]
58pub enum FabricEvent {
59 LeaseAcquired {
61 resource_type: ResourceType,
63 lease_id: u64,
65 node: String,
67 bytes: u64,
69 trace_id: Option<String>,
71 },
72 LeaseDropped {
74 resource_type: ResourceType,
76 lease_id: u64,
78 node: String,
80 },
81 LeaseExpired {
83 resource_type: ResourceType,
85 lease_id: u64,
87 node: String,
89 },
90 OpCompleted {
92 op_type: OpType,
94 duration_us: u64,
96 bytes: u64,
98 },
99 OpFailed {
101 op_type: OpType,
103 error: String,
105 },
106 RewriteStarted {
108 plan_id: u64,
110 },
111 RewriteCompleted {
113 plan_id: u64,
115 phase: RewritePhase,
117 },
118 ServiceRegistered {
120 name: String,
122 version: String,
124 },
125 ServiceDeregistered {
127 name: String,
129 },
130 MessagePublished {
132 topic: String,
134 bytes: u64,
136 },
137 MessageConsumed {
139 topic: String,
141 group: String,
143 },
144 ObjectStored {
146 bucket: String,
148 key: String,
150 bytes: u64,
152 },
153 ObjectRetrieved {
155 bucket: String,
157 key: String,
159 bytes: u64,
161 },
162 LeaseRevoked {
164 resource_type: ResourceType,
166 lease_id: u64,
168 node: String,
170 trace_id: Option<String>,
172 },
173 LeaseFenced {
175 resource_type: ResourceType,
177 lease_id: u64,
179 node: String,
181 reason: String,
183 trace_id: Option<String>,
185 },
186 TeardownFailed {
188 resource_type: ResourceType,
190 lease_id: u64,
192 node: String,
194 error: String,
196 },
197 AuthFailed {
199 node: String,
201 reason: String,
203 trace_id: Option<String>,
205 },
206 ReplayRejected {
208 node: String,
210 nonce: u64,
212 trace_id: Option<String>,
214 },
215 TokenValidationFailed {
217 node: String,
219 reason: String,
221 },
222 ListenerAcquired {
224 port: u16,
226 node: String,
228 lease_id: u64,
230 },
231 ListenerRevoked {
233 port: u16,
235 node: String,
237 lease_id: u64,
239 },
240 ListenerFenced {
242 port: u16,
244 node: String,
246 lease_id: u64,
248 reason: String,
250 },
251 SessionAccepted {
253 listener_port: u16,
255 session_id: u64,
257 node: String,
259 },
260 SessionClosed {
262 listener_port: u16,
264 session_id: u64,
266 node: String,
268 },
269 SessionDrained {
271 listener_port: u16,
273 sessions_drained: u32,
275 node: String,
277 },
278 ServiceDeployed {
280 name: String,
282 instance_count: u32,
284 },
285 ServiceInstanceStateChanged {
287 name: String,
289 instance_id: u64,
291 state: String,
293 },
294 ServiceCutoverStarted {
296 name: String,
298 },
299 ServiceCutoverCompleted {
301 name: String,
303 },
304 ServiceFailoverTriggered {
306 name: String,
308 reason: String,
310 },
311 ServiceFailoverCompleted {
313 name: String,
315 },
316 ServiceIngressFenced {
318 name: String,
320 instance_id: u64,
322 },
323 ServiceUndeployed {
325 name: String,
327 },
328 TaskletSubmitted {
330 tasklet_id: u64,
332 node: String,
334 runtime_type: String,
336 trace_id: Option<String>,
338 },
339 TaskletCompleted {
341 tasklet_id: u64,
343 status: u8,
345 duration_us: u64,
347 output_bytes: u64,
349 runtime_type: String,
351 trace_id: Option<String>,
353 },
354 TaskletFailed {
356 tasklet_id: u64,
358 status: u8,
360 duration_us: u64,
362 reason: String,
364 runtime_type: String,
366 trace_id: Option<String>,
368 },
369 SecurityProfileActive {
371 mode: String,
373 },
374 AdmissionApproved {
376 tenant_id: String,
378 node: String,
380 resource_type: String,
382 bytes: u64,
384 trace_id: Option<String>,
386 },
387 AdmissionDenied {
389 tenant_id: String,
391 resource_type: String,
393 reason: String,
395 trace_id: Option<String>,
397 },
398 PlacementDecision {
400 tenant_id: String,
402 node: String,
404 strategy: String,
406 score: f64,
408 trace_id: Option<String>,
410 },
411 PreemptionTriggered {
413 victim_lease_id: u64,
415 victim_tenant: String,
417 preemptor_tenant: String,
419 node: String,
421 trace_id: Option<String>,
423 },
424 QuotaExceeded {
426 tenant_id: String,
428 resource_type: String,
430 requested: u64,
432 limit: u64,
434 trace_id: Option<String>,
436 },
437 TokenMinted {
439 tenant_id: String,
441 node: String,
443 ttl_secs: u32,
445 trace_id: Option<String>,
447 },
448 ProcessStarted {
450 pid: String,
452 scenario: String,
454 nodes: String,
456 },
457 ProcessHeartbeat {
459 pid: String,
461 },
462 ProcessCompleted {
464 pid: String,
466 exit_code: i32,
468 duration_secs: u64,
470 },
471 SchedulerElectionLost {
473 reason: String,
475 epoch: u64,
477 },
478 SchedulerPromotionFailed {
480 reason: String,
482 epoch: u64,
484 },
485 SchedulerStaleLeaderDetected {
487 local_epoch: u64,
489 node_epoch: u64,
491 },
492 SchedulerPromoted {
494 epoch: u64,
496 nodes_fenced: usize,
498 latency_ms: u64,
500 },
501}
502
503#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
507pub enum ResourceType {
508 Mem,
510 Block,
512 Gpu,
514 Cpu,
516 Net,
518}
519
520impl fmt::Display for ResourceType {
521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522 match self {
523 ResourceType::Mem => write!(f, "mem"),
524 ResourceType::Block => write!(f, "block"),
525 ResourceType::Gpu => write!(f, "gpu"),
526 ResourceType::Cpu => write!(f, "cpu"),
527 ResourceType::Net => write!(f, "net"),
528 }
529 }
530}
531
532#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
537pub enum OpType {
538 Read,
540 Write,
542 ReadBlock,
544 WriteBlock,
546 GpuSubmit,
548 TaskletSubmit,
550}
551
552impl fmt::Display for OpType {
553 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554 match self {
555 OpType::Read => write!(f, "read"),
556 OpType::Write => write!(f, "write"),
557 OpType::ReadBlock => write!(f, "read_block"),
558 OpType::WriteBlock => write!(f, "write_block"),
559 OpType::GpuSubmit => write!(f, "gpu_submit"),
560 OpType::TaskletSubmit => write!(f, "tasklet_submit"),
561 }
562 }
563}
564
565#[derive(Debug, Clone, Copy, PartialEq, Eq)]
571pub enum RewritePhase {
572 Validate,
574 Stage,
576 Canary,
578 Cutover,
580 Cleanup,
582 Commit,
584 Rollback,
586}
587
588impl fmt::Display for RewritePhase {
589 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
590 match self {
591 RewritePhase::Validate => write!(f, "validate"),
592 RewritePhase::Stage => write!(f, "stage"),
593 RewritePhase::Canary => write!(f, "canary"),
594 RewritePhase::Cutover => write!(f, "cutover"),
595 RewritePhase::Cleanup => write!(f, "cleanup"),
596 RewritePhase::Commit => write!(f, "commit"),
597 RewritePhase::Rollback => write!(f, "rollback"),
598 }
599 }
600}
601
602impl fmt::Display for FabricEvent {
603 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
604 match self {
605 FabricEvent::LeaseAcquired {
606 resource_type,
607 lease_id,
608 node,
609 bytes,
610 trace_id,
611 } => {
612 write!(
613 f,
614 "lease_acquired: type={resource_type} id={lease_id} node={node} bytes={bytes}"
615 )?;
616 if let Some(tid) = trace_id {
617 write!(f, " trace_id={tid}")?;
618 }
619 Ok(())
620 }
621 FabricEvent::LeaseDropped {
622 resource_type,
623 lease_id,
624 node,
625 } => write!(
626 f,
627 "lease_dropped: type={resource_type} id={lease_id} node={node}"
628 ),
629 FabricEvent::LeaseExpired {
630 resource_type,
631 lease_id,
632 node,
633 } => write!(
634 f,
635 "lease_expired: type={resource_type} id={lease_id} node={node}"
636 ),
637 FabricEvent::OpCompleted {
638 op_type,
639 duration_us,
640 bytes,
641 } => write!(
642 f,
643 "op_completed: op={op_type} duration_us={duration_us} bytes={bytes}"
644 ),
645 FabricEvent::OpFailed { op_type, error } => {
646 write!(f, "op_failed: op={op_type} error={error}")
647 }
648 FabricEvent::RewriteStarted { plan_id } => {
649 write!(f, "rewrite_started: plan_id={plan_id}")
650 }
651 FabricEvent::RewriteCompleted { plan_id, phase } => {
652 write!(f, "rewrite_completed: plan_id={plan_id} phase={phase}")
653 }
654 FabricEvent::ServiceRegistered { name, version } => {
655 write!(f, "service_registered: name={name} version={version}")
656 }
657 FabricEvent::ServiceDeregistered { name } => {
658 write!(f, "service_deregistered: name={name}")
659 }
660 FabricEvent::MessagePublished { topic, bytes } => {
661 write!(f, "message_published: topic={topic} bytes={bytes}")
662 }
663 FabricEvent::MessageConsumed { topic, group } => {
664 write!(f, "message_consumed: topic={topic} group={group}")
665 }
666 FabricEvent::ObjectStored { bucket, key, bytes } => {
667 write!(f, "object_stored: bucket={bucket} key={key} bytes={bytes}")
668 }
669 FabricEvent::ObjectRetrieved { bucket, key, bytes } => {
670 write!(
671 f,
672 "object_retrieved: bucket={bucket} key={key} bytes={bytes}"
673 )
674 }
675 FabricEvent::LeaseRevoked {
676 resource_type,
677 lease_id,
678 node,
679 trace_id,
680 } => {
681 write!(
682 f,
683 "lease_revoked: type={resource_type} id={lease_id} node={node}"
684 )?;
685 if let Some(tid) = trace_id {
686 write!(f, " trace_id={tid}")?;
687 }
688 Ok(())
689 }
690 FabricEvent::LeaseFenced {
691 resource_type,
692 lease_id,
693 node,
694 reason,
695 trace_id,
696 } => {
697 write!(
698 f,
699 "lease_fenced: type={resource_type} id={lease_id} node={node} reason={reason}"
700 )?;
701 if let Some(tid) = trace_id {
702 write!(f, " trace_id={tid}")?;
703 }
704 Ok(())
705 }
706 FabricEvent::TeardownFailed {
707 resource_type,
708 lease_id,
709 node,
710 error,
711 } => write!(
712 f,
713 "teardown_failed: type={resource_type} id={lease_id} node={node} error={error}"
714 ),
715 FabricEvent::AuthFailed {
716 node,
717 reason,
718 trace_id,
719 } => {
720 write!(f, "auth_failed: node={node} reason={reason}")?;
721 if let Some(tid) = trace_id {
722 write!(f, " trace_id={tid}")?;
723 }
724 Ok(())
725 }
726 FabricEvent::ReplayRejected {
727 node,
728 nonce,
729 trace_id,
730 } => {
731 write!(f, "replay_rejected: node={node} nonce={nonce}")?;
732 if let Some(tid) = trace_id {
733 write!(f, " trace_id={tid}")?;
734 }
735 Ok(())
736 }
737 FabricEvent::TokenValidationFailed { node, reason } => {
738 write!(f, "token_validation_failed: node={node} reason={reason}")
739 }
740 FabricEvent::ListenerAcquired {
741 port,
742 node,
743 lease_id,
744 } => write!(
745 f,
746 "listener_acquired: port={port} node={node} lease_id={lease_id}"
747 ),
748 FabricEvent::ListenerRevoked {
749 port,
750 node,
751 lease_id,
752 } => write!(
753 f,
754 "listener_revoked: port={port} node={node} lease_id={lease_id}"
755 ),
756 FabricEvent::ListenerFenced {
757 port,
758 node,
759 lease_id,
760 reason,
761 } => write!(
762 f,
763 "listener_fenced: port={port} node={node} lease_id={lease_id} reason={reason}"
764 ),
765 FabricEvent::SessionAccepted {
766 listener_port,
767 session_id,
768 node,
769 } => write!(
770 f,
771 "session_accepted: listener_port={listener_port} session_id={session_id} node={node}"
772 ),
773 FabricEvent::SessionClosed {
774 listener_port,
775 session_id,
776 node,
777 } => write!(
778 f,
779 "session_closed: listener_port={listener_port} session_id={session_id} node={node}"
780 ),
781 FabricEvent::SessionDrained {
782 listener_port,
783 sessions_drained,
784 node,
785 } => write!(
786 f,
787 "session_drained: listener_port={listener_port} sessions_drained={sessions_drained} node={node}"
788 ),
789 FabricEvent::ServiceDeployed {
790 name,
791 instance_count,
792 } => write!(
793 f,
794 "service_deployed: name={name} instance_count={instance_count}"
795 ),
796 FabricEvent::ServiceInstanceStateChanged {
797 name,
798 instance_id,
799 state,
800 } => write!(
801 f,
802 "service_instance_state_changed: name={name} instance_id={instance_id} state={state}"
803 ),
804 FabricEvent::ServiceCutoverStarted { name } => {
805 write!(f, "service_cutover_started: name={name}")
806 }
807 FabricEvent::ServiceCutoverCompleted { name } => {
808 write!(f, "service_cutover_completed: name={name}")
809 }
810 FabricEvent::ServiceFailoverTriggered { name, reason } => {
811 write!(
812 f,
813 "service_failover_triggered: name={name} reason={reason}"
814 )
815 }
816 FabricEvent::ServiceFailoverCompleted { name } => {
817 write!(f, "service_failover_completed: name={name}")
818 }
819 FabricEvent::ServiceIngressFenced { name, instance_id } => {
820 write!(
821 f,
822 "service_ingress_fenced: name={name} instance_id={instance_id}"
823 )
824 }
825 FabricEvent::ServiceUndeployed { name } => {
826 write!(f, "service_undeployed: name={name}")
827 }
828 FabricEvent::TaskletSubmitted {
829 tasklet_id,
830 node,
831 runtime_type,
832 trace_id,
833 } => {
834 write!(
835 f,
836 "tasklet_submitted: tasklet_id={tasklet_id} node={node} runtime_type={runtime_type}"
837 )?;
838 if let Some(tid) = trace_id {
839 write!(f, " trace_id={tid}")?;
840 }
841 Ok(())
842 }
843 FabricEvent::TaskletCompleted {
844 tasklet_id,
845 status,
846 duration_us,
847 output_bytes,
848 runtime_type,
849 trace_id,
850 } => {
851 write!(
852 f,
853 "tasklet_completed: tasklet_id={tasklet_id} status={status} duration_us={duration_us} output_bytes={output_bytes} runtime_type={runtime_type}"
854 )?;
855 if let Some(tid) = trace_id {
856 write!(f, " trace_id={tid}")?;
857 }
858 Ok(())
859 }
860 FabricEvent::TaskletFailed {
861 tasklet_id,
862 status,
863 duration_us,
864 reason,
865 runtime_type,
866 trace_id,
867 } => {
868 write!(
869 f,
870 "tasklet_failed: tasklet_id={tasklet_id} status={status} duration_us={duration_us} reason={reason} runtime_type={runtime_type}"
871 )?;
872 if let Some(tid) = trace_id {
873 write!(f, " trace_id={tid}")?;
874 }
875 Ok(())
876 }
877 FabricEvent::SecurityProfileActive { mode } => {
878 write!(f, "security_profile_active: mode={mode}")
879 }
880 FabricEvent::AdmissionApproved {
881 tenant_id,
882 node,
883 resource_type,
884 bytes,
885 trace_id,
886 } => {
887 write!(
888 f,
889 "admission_approved: tenant={tenant_id} node={node} type={resource_type} bytes={bytes}"
890 )?;
891 if let Some(tid) = trace_id {
892 write!(f, " trace_id={tid}")?;
893 }
894 Ok(())
895 }
896 FabricEvent::AdmissionDenied {
897 tenant_id,
898 resource_type,
899 reason,
900 trace_id,
901 } => {
902 write!(
903 f,
904 "admission_denied: tenant={tenant_id} type={resource_type} reason={reason}"
905 )?;
906 if let Some(tid) = trace_id {
907 write!(f, " trace_id={tid}")?;
908 }
909 Ok(())
910 }
911 FabricEvent::PlacementDecision {
912 tenant_id,
913 node,
914 strategy,
915 score,
916 trace_id,
917 } => {
918 write!(
919 f,
920 "placement_decision: tenant={tenant_id} node={node} strategy={strategy} score={score}"
921 )?;
922 if let Some(tid) = trace_id {
923 write!(f, " trace_id={tid}")?;
924 }
925 Ok(())
926 }
927 FabricEvent::PreemptionTriggered {
928 victim_lease_id,
929 victim_tenant,
930 preemptor_tenant,
931 node,
932 trace_id,
933 } => {
934 write!(
935 f,
936 "preemption_triggered: victim_lease_id={victim_lease_id} victim_tenant={victim_tenant} preemptor_tenant={preemptor_tenant} node={node}"
937 )?;
938 if let Some(tid) = trace_id {
939 write!(f, " trace_id={tid}")?;
940 }
941 Ok(())
942 }
943 FabricEvent::QuotaExceeded {
944 tenant_id,
945 resource_type,
946 requested,
947 limit,
948 trace_id,
949 } => {
950 write!(
951 f,
952 "quota_exceeded: tenant={tenant_id} type={resource_type} requested={requested} limit={limit}"
953 )?;
954 if let Some(tid) = trace_id {
955 write!(f, " trace_id={tid}")?;
956 }
957 Ok(())
958 }
959 FabricEvent::TokenMinted {
960 tenant_id,
961 node,
962 ttl_secs,
963 trace_id,
964 } => {
965 write!(
966 f,
967 "token_minted: tenant={tenant_id} node={node} ttl_secs={ttl_secs}"
968 )?;
969 if let Some(tid) = trace_id {
970 write!(f, " trace_id={tid}")?;
971 }
972 Ok(())
973 }
974 FabricEvent::ProcessStarted {
975 pid,
976 scenario,
977 nodes,
978 } => {
979 write!(
980 f,
981 "process_started: pid={pid} scenario={scenario} nodes={nodes}"
982 )
983 }
984 FabricEvent::ProcessHeartbeat { pid } => {
985 write!(f, "process_heartbeat: pid={pid}")
986 }
987 FabricEvent::ProcessCompleted {
988 pid,
989 exit_code,
990 duration_secs,
991 } => {
992 write!(
993 f,
994 "process_completed: pid={pid} exit_code={exit_code} duration_secs={duration_secs}"
995 )
996 }
997 FabricEvent::SchedulerElectionLost { reason, epoch } => {
998 write!(
999 f,
1000 "scheduler_election_lost: reason={reason} epoch={epoch}"
1001 )
1002 }
1003 FabricEvent::SchedulerPromotionFailed { reason, epoch } => {
1004 write!(
1005 f,
1006 "scheduler_promotion_failed: reason={reason} epoch={epoch}"
1007 )
1008 }
1009 FabricEvent::SchedulerStaleLeaderDetected {
1010 local_epoch,
1011 node_epoch,
1012 } => {
1013 write!(
1014 f,
1015 "scheduler_stale_leader_detected: local_epoch={local_epoch} node_epoch={node_epoch}"
1016 )
1017 }
1018 FabricEvent::SchedulerPromoted {
1019 epoch,
1020 nodes_fenced,
1021 latency_ms,
1022 } => {
1023 write!(
1024 f,
1025 "scheduler_promoted: epoch={epoch} nodes_fenced={nodes_fenced} latency_ms={latency_ms}"
1026 )
1027 }
1028 }
1029 }
1030}
1031
1032pub trait EventSink {
1045 fn emit(&self, event: &FabricEvent);
1047}
1048
1049pub struct NullSink;
1051
1052impl EventSink for NullSink {
1053 fn emit(&self, _event: &FabricEvent) {}
1054}
1055
1056#[cfg(feature = "std")]
1058pub struct StdoutSink;
1059
1060#[cfg(feature = "std")]
1061impl EventSink for StdoutSink {
1062 fn emit(&self, event: &FabricEvent) {
1063 println!("[fabric] {event}");
1064 }
1065}
1066
1067pub struct EventRingBuffer {
1097 buf: alloc::vec::Vec<Option<FabricEvent>>,
1098 head: usize,
1100 len: usize,
1102}
1103
1104impl EventRingBuffer {
1105 pub const DEFAULT_CAPACITY: usize = 1024;
1107
1108 pub fn new(capacity: usize) -> Self {
1110 let cap = if capacity == 0 { 1 } else { capacity };
1111 let mut buf = alloc::vec::Vec::with_capacity(cap);
1112 buf.resize_with(cap, || None);
1113 Self {
1114 buf,
1115 head: 0,
1116 len: 0,
1117 }
1118 }
1119
1120 pub fn with_default_capacity() -> Self {
1122 Self::new(Self::DEFAULT_CAPACITY)
1123 }
1124
1125 pub fn push(&mut self, event: FabricEvent) {
1129 self.buf[self.head] = Some(event);
1130 self.head = (self.head + 1) % self.buf.len();
1131 if self.len < self.buf.len() {
1132 self.len += 1;
1133 }
1134 }
1135
1136 pub fn len(&self) -> usize {
1138 self.len
1139 }
1140
1141 pub fn is_empty(&self) -> bool {
1143 self.len == 0
1144 }
1145
1146 pub fn capacity(&self) -> usize {
1148 self.buf.len()
1149 }
1150
1151 pub fn iter(&self) -> EventRingIter<'_> {
1153 let start = if self.len < self.buf.len() {
1154 0
1155 } else {
1156 self.head
1157 };
1158 EventRingIter {
1159 buf: &self.buf,
1160 pos: start,
1161 remaining: self.len,
1162 }
1163 }
1164
1165 pub fn drain(&mut self) -> alloc::vec::Vec<FabricEvent> {
1169 let mut events = alloc::vec::Vec::with_capacity(self.len);
1170 let start = if self.len < self.buf.len() {
1171 0
1172 } else {
1173 self.head
1174 };
1175 for i in 0..self.len {
1176 let idx = (start + i) % self.buf.len();
1177 if let Some(ev) = self.buf[idx].take() {
1178 events.push(ev);
1179 }
1180 }
1181 self.head = 0;
1182 self.len = 0;
1183 events
1184 }
1185}
1186
1187impl EventSink for EventRingBuffer {
1188 fn emit(&self, event: &FabricEvent) {
1189 let _ = event;
1194 }
1195}
1196
1197pub struct EventRingIter<'a> {
1199 buf: &'a [Option<FabricEvent>],
1200 pos: usize,
1201 remaining: usize,
1202}
1203
1204impl<'a> Iterator for EventRingIter<'a> {
1205 type Item = &'a FabricEvent;
1206
1207 fn next(&mut self) -> Option<Self::Item> {
1208 if self.remaining == 0 {
1209 return None;
1210 }
1211 let idx = self.pos % self.buf.len();
1212 self.pos += 1;
1213 self.remaining -= 1;
1214 self.buf[idx].as_ref()
1215 }
1216
1217 fn size_hint(&self) -> (usize, Option<usize>) {
1218 (self.remaining, Some(self.remaining))
1219 }
1220}
1221
1222impl<'a> ExactSizeIterator for EventRingIter<'a> {}
1223
1224#[cfg(test)]
1225mod tests {
1226 use super::*;
1227 use alloc::string::ToString;
1228
1229 fn sample_lease_acquired() -> FabricEvent {
1230 FabricEvent::LeaseAcquired {
1231 resource_type: ResourceType::Mem,
1232 lease_id: 1,
1233 node: "10.10.0.11".to_string(),
1234 bytes: 4096,
1235 trace_id: None,
1236 }
1237 }
1238
1239 fn sample_op_completed() -> FabricEvent {
1240 FabricEvent::OpCompleted {
1241 op_type: OpType::Write,
1242 duration_us: 500,
1243 bytes: 1024,
1244 }
1245 }
1246
1247 #[test]
1248 fn ring_buffer_push_and_len() {
1249 let mut rb = EventRingBuffer::new(4);
1250 assert!(rb.is_empty());
1251 assert_eq!(rb.len(), 0);
1252 assert_eq!(rb.capacity(), 4);
1253
1254 rb.push(sample_lease_acquired());
1255 assert_eq!(rb.len(), 1);
1256 assert!(!rb.is_empty());
1257 }
1258
1259 #[test]
1260 fn ring_buffer_overflow_wraps() {
1261 let mut rb = EventRingBuffer::new(2);
1262 rb.push(FabricEvent::RewriteStarted { plan_id: 1 });
1263 rb.push(FabricEvent::RewriteStarted { plan_id: 2 });
1264 rb.push(FabricEvent::RewriteStarted { plan_id: 3 });
1265
1266 assert_eq!(rb.len(), 2);
1268
1269 let events: alloc::vec::Vec<_> = rb.iter().collect();
1270 assert_eq!(events.len(), 2);
1271 match &events[0] {
1273 FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 2),
1274 _ => panic!("unexpected event type"),
1275 }
1276 match &events[1] {
1277 FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 3),
1278 _ => panic!("unexpected event type"),
1279 }
1280 }
1281
1282 #[test]
1283 fn ring_buffer_drain() {
1284 let mut rb = EventRingBuffer::new(4);
1285 rb.push(sample_lease_acquired());
1286 rb.push(sample_op_completed());
1287 assert_eq!(rb.len(), 2);
1288
1289 let drained = rb.drain();
1290 assert_eq!(drained.len(), 2);
1291 assert!(rb.is_empty());
1292 assert_eq!(rb.len(), 0);
1293 }
1294
1295 #[test]
1296 fn ring_buffer_drain_after_overflow() {
1297 let mut rb = EventRingBuffer::new(2);
1298 rb.push(FabricEvent::RewriteStarted { plan_id: 1 });
1299 rb.push(FabricEvent::RewriteStarted { plan_id: 2 });
1300 rb.push(FabricEvent::RewriteStarted { plan_id: 3 });
1301
1302 let drained = rb.drain();
1303 assert_eq!(drained.len(), 2);
1304 match &drained[0] {
1305 FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 2),
1306 _ => panic!("unexpected event type"),
1307 }
1308 match &drained[1] {
1309 FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 3),
1310 _ => panic!("unexpected event type"),
1311 }
1312 }
1313
1314 #[test]
1315 fn ring_buffer_iter_exact_size() {
1316 let mut rb = EventRingBuffer::new(4);
1317 rb.push(sample_lease_acquired());
1318 rb.push(sample_op_completed());
1319 let iter = rb.iter();
1320 assert_eq!(iter.len(), 2);
1321 }
1322
1323 #[test]
1324 fn ring_buffer_default_capacity() {
1325 let rb = EventRingBuffer::with_default_capacity();
1326 assert_eq!(rb.capacity(), 1024);
1327 assert!(rb.is_empty());
1328 }
1329
1330 #[test]
1331 fn ring_buffer_zero_capacity_becomes_one() {
1332 let rb = EventRingBuffer::new(0);
1333 assert_eq!(rb.capacity(), 1);
1334 }
1335
1336 #[test]
1337 fn null_sink_does_not_panic() {
1338 let sink = NullSink;
1339 sink.emit(&sample_lease_acquired());
1340 sink.emit(&sample_op_completed());
1341 }
1342
1343 #[test]
1344 fn event_display() {
1345 let ev = sample_lease_acquired();
1346 let s = alloc::format!("{ev}");
1347 assert!(s.contains("lease_acquired"));
1348 assert!(s.contains("10.10.0.11"));
1349 assert!(s.contains("4096"));
1350
1351 let ev2 = FabricEvent::OpFailed {
1352 op_type: OpType::Read,
1353 error: "timeout".to_string(),
1354 };
1355 let s2 = alloc::format!("{ev2}");
1356 assert!(s2.contains("op_failed"));
1357 assert!(s2.contains("timeout"));
1358 }
1359
1360 #[test]
1361 fn resource_type_display() {
1362 assert_eq!(alloc::format!("{}", ResourceType::Mem), "mem");
1363 assert_eq!(alloc::format!("{}", ResourceType::Block), "block");
1364 assert_eq!(alloc::format!("{}", ResourceType::Gpu), "gpu");
1365 assert_eq!(alloc::format!("{}", ResourceType::Cpu), "cpu");
1366 }
1367
1368 #[test]
1369 fn op_type_display() {
1370 assert_eq!(alloc::format!("{}", OpType::Read), "read");
1371 assert_eq!(alloc::format!("{}", OpType::Write), "write");
1372 assert_eq!(alloc::format!("{}", OpType::ReadBlock), "read_block");
1373 assert_eq!(alloc::format!("{}", OpType::WriteBlock), "write_block");
1374 assert_eq!(alloc::format!("{}", OpType::GpuSubmit), "gpu_submit");
1375 assert_eq!(
1376 alloc::format!("{}", OpType::TaskletSubmit),
1377 "tasklet_submit"
1378 );
1379 }
1380
1381 #[test]
1382 fn rewrite_phase_display() {
1383 assert_eq!(alloc::format!("{}", RewritePhase::Validate), "validate");
1384 assert_eq!(alloc::format!("{}", RewritePhase::Stage), "stage");
1385 assert_eq!(alloc::format!("{}", RewritePhase::Canary), "canary");
1386 assert_eq!(alloc::format!("{}", RewritePhase::Cutover), "cutover");
1387 assert_eq!(alloc::format!("{}", RewritePhase::Cleanup), "cleanup");
1388 assert_eq!(alloc::format!("{}", RewritePhase::Commit), "commit");
1389 assert_eq!(alloc::format!("{}", RewritePhase::Rollback), "rollback");
1390 }
1391
1392 #[test]
1393 fn new_event_variants_display() {
1394 let ev = FabricEvent::LeaseRevoked {
1395 resource_type: ResourceType::Mem,
1396 lease_id: 10,
1397 node: "node-a".to_string(),
1398 trace_id: None,
1399 };
1400 let s = alloc::format!("{ev}");
1401 assert!(s.contains("lease_revoked"));
1402 assert!(s.contains("node-a"));
1403
1404 let ev = FabricEvent::LeaseFenced {
1405 resource_type: ResourceType::Block,
1406 lease_id: 11,
1407 node: "node-b".to_string(),
1408 reason: "teardown timeout".to_string(),
1409 trace_id: None,
1410 };
1411 let s = alloc::format!("{ev}");
1412 assert!(s.contains("lease_fenced"));
1413 assert!(s.contains("teardown timeout"));
1414
1415 let ev = FabricEvent::TeardownFailed {
1416 resource_type: ResourceType::Gpu,
1417 lease_id: 12,
1418 node: "node-c".to_string(),
1419 error: "connection lost".to_string(),
1420 };
1421 let s = alloc::format!("{ev}");
1422 assert!(s.contains("teardown_failed"));
1423 assert!(s.contains("connection lost"));
1424
1425 let ev = FabricEvent::AuthFailed {
1426 node: "node-d".to_string(),
1427 reason: "bad cert".to_string(),
1428 trace_id: None,
1429 };
1430 let s = alloc::format!("{ev}");
1431 assert!(s.contains("auth_failed"));
1432 assert!(s.contains("bad cert"));
1433
1434 let ev = FabricEvent::ReplayRejected {
1435 node: "node-e".to_string(),
1436 nonce: 12345,
1437 trace_id: None,
1438 };
1439 let s = alloc::format!("{ev}");
1440 assert!(s.contains("replay_rejected"));
1441 assert!(s.contains("12345"));
1442
1443 let ev = FabricEvent::TokenValidationFailed {
1444 node: "node-f".to_string(),
1445 reason: "expired".to_string(),
1446 };
1447 let s = alloc::format!("{ev}");
1448 assert!(s.contains("token_validation_failed"));
1449 assert!(s.contains("expired"));
1450 }
1451
1452 #[cfg(feature = "std")]
1453 #[test]
1454 fn stdout_sink_does_not_panic() {
1455 let sink = StdoutSink;
1456 sink.emit(&sample_lease_acquired());
1457 }
1458
1459 #[test]
1460 fn tasklet_event_variants_display() {
1461 let ev = FabricEvent::TaskletSubmitted {
1462 tasklet_id: 42,
1463 node: "node-a".to_string(),
1464 runtime_type: "linux-native-tasklet".to_string(),
1465 trace_id: None,
1466 };
1467 let s = alloc::format!("{ev}");
1468 assert!(s.contains("tasklet_submitted"));
1469 assert!(s.contains("42"));
1470
1471 let ev = FabricEvent::TaskletCompleted {
1472 tasklet_id: 42,
1473 status: 0,
1474 duration_us: 1500,
1475 output_bytes: 256,
1476 runtime_type: "linux-native-tasklet".to_string(),
1477 trace_id: None,
1478 };
1479 let s = alloc::format!("{ev}");
1480 assert!(s.contains("tasklet_completed"));
1481 assert!(s.contains("1500"));
1482
1483 let ev = FabricEvent::TaskletFailed {
1484 tasklet_id: 42,
1485 status: 3,
1486 duration_us: 500,
1487 reason: "fuel_exhausted".to_string(),
1488 runtime_type: "linux-native-tasklet".to_string(),
1489 trace_id: None,
1490 };
1491 let s = alloc::format!("{ev}");
1492 assert!(s.contains("tasklet_failed"));
1493 assert!(s.contains("fuel_exhausted"));
1494 }
1495
1496 #[cfg(feature = "std")]
1497 #[test]
1498 fn emit_event_without_sink_is_noop() {
1499 emit_event(sample_lease_acquired());
1501 }
1502}