Recipe 55: Consuming the Audit Chain
Situation
A compliance team needs a tamper-evident trail of every committed graph rewrite, lease lifecycle transition, and admission decision the scheduler made. The fabric already emits a SHA-256-linked audit chain in JSONL; the team needs to ingest it, verify the linkage end-to-end, and pull typed fields out of the records that matter to their reports.
In grafOS, the rewrite engine seals one EdgeRewritten record per
affected edge on every successful commit, alongside the existing
admission / lease / preemption / drain records. Each record’s
current_event_hash chains to the next’s prev_event_hash. A
consumer that walks the chain, recomputes each hash, and decodes the
typed event_data can prove no record was added, removed, reordered,
or modified after sealing.
What You Build
A compliance / analytics pipeline that:
- Reads a
grafos-auditJSONL stream (oneAuditRecordper line); - Verifies linkage against a persisted anchor (the chain head from the prior batch, durable across process restarts);
- Refuses to advance the anchor or emit observations when any record fails verification;
- Filters records carrying
event_data = EdgeRewritten { edge_id, edge_record_bytes }and decodes the embedded bytes back to a typedEdgeRecord; - Surfaces a
Vec<EdgeRewriteObservation>in source order.
The compiled recipe lives in
cookbook/recipe-55-consuming-audit-chain.
Core grafOS API Path
The pipeline is a thin layer on top of the reference collector at
crates/grafos-audit-collector. The collector handles parse +
linkage verify + anchor advance; this recipe adds the per-record
filter + typed decode the compliance workload actually cares about.
use grafos_audit::{AnchorStore, AuditEventData, FileAnchorStore};use grafos_audit_collector::{Collector, IngestError};use grafos_core::edge_record::EdgeRecord;use grafos_core::EdgeId;
let anchor = FileAnchorStore::load_or_unanchored("/var/lib/audit/anchor.dat")?;let mut collector = Collector::new(anchor);
// Parse the batch once; the collector consumes it for verify, the// pipeline reuses the parsed list to filter / decode after verify// succeeds.let records = grafos_audit::jsonl::read_chain(reader)?;collector.ingest_records(records.clone())?; // verify + advance anchor
for record in &records { if let Some(AuditEventData::EdgeRewritten { edge_id, edge_record_bytes }) = record.event_data.as_ref() { let (decoded, _consumed) = EdgeRecord::decode(edge_record_bytes)?; // decoded.edge_id, decoded.src_port, decoded.dst_port, // decoded.protocol, decoded.features, decoded.rights, // decoded.state, decoded.lease_ref, decoded.binding_ref, // decoded.constraints — all typed. }}# Ok::<(), Box<dyn std::error::Error>>(())Program
use cookbook_recipe_55_consuming_audit_chain::Pipeline;use grafos_audit::FileAnchorStore;use std::fs::File;use std::io::BufReader;
let anchor = FileAnchorStore::load_or_unanchored("/var/lib/audit/anchor.dat")?;let mut pipeline = Pipeline::new(anchor);
let file = File::open("/var/log/audit/2026-05-12.jsonl")?;let reader = BufReader::new(file);let n = pipeline.ingest_jsonl(reader)?;
println!( "ingested {n} records; observations={}; verification_failures={}", pipeline.edge_rewrite_observations().len(), pipeline.metrics().chain_verification_failures,);
for obs in pipeline.edge_rewrite_observations() { println!( "edge_rewritten seq={} edge={} features={:#010x}", obs.record_sequence, obs.edge_id.to_hex_0x(), obs.decoded.features, );}# Ok::<(), Box<dyn std::error::Error>>(())Design
The pipeline is constructed in two distinct phases by design:
-
Verify —
Collector::ingest_recordswalks the chain head to tail, recomputing eachcurrent_event_hashfrom canonical bytes and checking it against the declared field. If any record fails, the persisted anchor does NOT advance and no observations are emitted. The collector counterchain_verification_failuresincrements — operators wire this to an alert because it is the chain-integrity signal. -
Decode — only reached after verify succeeded, so the bytes the recipe decodes are the same bytes the chain hash already signed off on. A decode failure here means the embedded
edge_record_bytesare a valid SHA-256 input but not a validEdgeRecordshape — that’s a producer bug, not a tamper. The pipeline still fails closed and returnsPipelineError::EdgeRecordDecode { record_sequence, edge_id, .. }so operators can localize the producer.
The anchor is the chain head pointer, not the records themselves.
A file-backed anchor (FileAnchorStore::load_or_unanchored) is
written atomically (tempfile-then-rename) on every successful
batch, so a crash between batches resumes from the last
successful head.
Failure Modes
- Anchor mismatch: the batch’s first record’s
prev_event_hashdid not match the persisted anchor. Anchor unchanged. ReturnsPipelineError::Ingest(IngestError::AnchorMismatch { expected, actual }). Usually means: re-ingesting an already-collected window, or a producer-side restart that skipped records. - Hash linkage / tamper: a record’s
current_event_hashdoes not equalSHA-256(canonical_bytes), or itsprev_event_hashdoes not equal the prior record’scurrent_event_hash. Anchor unchanged. ReturnsPipelineError::Ingest(IngestError::LinkageFailure { failing_index })naming the 0-indexed offset. - Malformed edge bytes:
EdgeRecord::decoderejected the embeddededge_record_bytes. The chain hash still verified — the bytes-as-blob were sealed correctly — but they aren’t a validEdgeRecordshape. ReturnsPipelineError::EdgeRecordDecode { record_sequence, edge_id, message }. Anchor unchanged, no observations emitted. - JSONL parse error: malformed JSON, missing required fields,
or I/O. Returns
PipelineError::Ingest(IngestError::Parse(_)). Distinct fromchain_verification_failureson the metrics surface so transport / serialization failures don’t alert on the chain-integrity channel.
Tests
Run it with:
cargo test -p cookbook-recipe-55-consuming-audit-chainThe tests cover the happy path (3 records ingested, 2 EdgeRewritten observations decoded), tamper detection (no partial emit), malformed edge bytes (fail-closed despite valid chain hash), and a file-backed anchor surviving a “process restart” (re-ingesting the same batch fails with an anchor mismatch).
Adaptation Notes
- Anchor storage: swap
FileAnchorStorefor anyAnchorStoreimpl. A production collector typically writes the anchor to a small durable record in its own state store. - Transport: the collector exposes
ingest_recordsfor callers consuming a non-JSONL transport (Kafka, gRPC, S3 object). The verify + anchor-advance discipline is the same. - Filtering: this recipe filters on
AuditEventData::EdgeRewritten. A SIEM consumer interested in lease churn would filter onrecord.kind == AuditEventKind::LeaseRevoked(or any of the 27 typed kinds — seegrafos admin audit-query --kindfor the full list). - Signature verification: each
AuditRecordcarries anOption<signature>overcurrent_event_hash. The reference collector ignores the signature today (matching the development- modeNullSignerproducer); production callers add a signer-side check before acceptingchain_verification_failures == 0as proof of integrity.
See also:
crates/grafos-audit—AuditRecord,AuditEventKind,AuditEventData,verify_chain,jsonl::read_chain.crates/grafos-audit-collector— the reference collector.docs/spec/audit-chain-canonical-bytes.md— authoritative wire- shape spec for the 8 event-data tags.docs/operations/siem-vocabulary-cookbook.md— SIEM queries keyed onAuditEventKind.