grafos_observe/
file_socket_sink.rs

1//! File + Unix socket event sink.
2//!
3//! [`FileAndSocketSink`] writes JSON-lines to an append-only log file and,
4//! optionally, to a connected Unix stream socket client. Enabled by the
5//! `file-socket` feature flag (requires `std`).
6
7use std::io::{self, Write};
8use std::net::Shutdown;
9use std::os::unix::net::{UnixListener, UnixStream};
10use std::path::Path;
11use std::sync::{Arc, Mutex};
12
13use crate::event::{EventSink, FabricEvent};
14use crate::json_log::JsonEventSink;
15
16/// An [`EventSink`] that writes JSON-lines to a log file and a Unix socket.
17///
18/// The log file is opened in append mode. A background thread accepts
19/// connections on a Unix domain socket; at most one client is served at a time.
20/// If no client is connected, socket writes are silently skipped.
21///
22/// # Examples
23///
24/// ```no_run
25/// use grafos_observe::file_socket_sink::FileAndSocketSink;
26/// use std::path::Path;
27///
28/// let sink = FileAndSocketSink::new(
29///     Path::new("/tmp/fabric-events.jsonl"),
30///     Path::new("/tmp/fabric-events.sock"),
31/// ).expect("failed to create sink");
32/// ```
33pub struct FileAndSocketSink {
34    log_file: Mutex<std::fs::File>,
35    client: Arc<Mutex<Option<UnixStream>>>,
36    // Hold the join handle so the background thread lives as long as the sink.
37    _accept_thread: std::thread::JoinHandle<()>,
38}
39
40impl FileAndSocketSink {
41    /// Create a new sink writing to `log_path` (append-only) and listening on
42    /// `socket_path` for a single Unix stream client at a time.
43    ///
44    /// The socket file is removed if it already exists before binding.
45    pub fn new(log_path: &Path, socket_path: &Path) -> io::Result<Self> {
46        let log_file = std::fs::OpenOptions::new()
47            .create(true)
48            .append(true)
49            .open(log_path)?;
50
51        // Remove stale socket file if present.
52        if socket_path.exists() {
53            std::fs::remove_file(socket_path)?;
54        }
55        let listener = UnixListener::bind(socket_path)?;
56
57        let client: Arc<Mutex<Option<UnixStream>>> = Arc::new(Mutex::new(None));
58        let client_bg = Arc::clone(&client);
59
60        let accept_thread = std::thread::Builder::new()
61            .name("fabric-event-sock".into())
62            .spawn(move || {
63                // Accept loop: replace the current client on each new connection.
64                for stream in listener.incoming() {
65                    match stream {
66                        Ok(new_stream) => {
67                            // Shut down the previous client, if any.
68                            if let Ok(mut guard) = client_bg.lock() {
69                                if let Some(old) = guard.take() {
70                                    let _ = old.shutdown(Shutdown::Both);
71                                }
72                                *guard = Some(new_stream);
73                            }
74                        }
75                        Err(_) => {
76                            // Listener error (e.g. socket removed) — exit accept loop.
77                            break;
78                        }
79                    }
80                }
81            })?;
82
83        Ok(Self {
84            log_file: Mutex::new(log_file),
85            client,
86            _accept_thread: accept_thread,
87        })
88    }
89}
90
91impl EventSink for FileAndSocketSink {
92    fn emit(&self, event: &FabricEvent) {
93        let mut line = JsonEventSink::format_event(event);
94        line.push('\n');
95
96        // Write to log file (append mode).
97        if let Ok(mut f) = self.log_file.lock() {
98            if let Err(e) = f.write_all(line.as_bytes()) {
99                eprintln!("[grafos-observe] file write failed: {e}");
100            }
101        }
102
103        // Write to connected Unix socket client, if any.
104        if let Ok(mut guard) = self.client.lock() {
105            if let Some(ref mut stream) = *guard {
106                if stream.write_all(line.as_bytes()).is_err() {
107                    // Broken pipe or client disconnected — drop the reference.
108                    *guard = None;
109                }
110            }
111        }
112    }
113}