Recipe 10: A MapReduce Job That Finishes With Zero Cleanup
Situation
Batch jobs allocate a lot of temporary resources:
- shuffle partitions
- intermediate outputs
- worker scratch space
In many systems, cleanup is an operational afterthought:
- orphaned workers
- leftover temp files
- long GC tails
In a lease-oriented system, you want a stronger invariant:
Resources exist iff the job is actively holding leases for them.
If the job completes, leases drop. If the job crashes, leases expire. Either way, intermediate artifacts disappear.
What You Build
A MapReduce-like flow:
- mappers lease memory for input buffers and intermediate partitions
- shuffle uses leased memory or block storage for partition exchange
- reducers lease memory for aggregation and write final output to block storage
All intermediate state is leased.
Building Blocks
MemBuilder/MemLeasefor intermediate memoryBlockBuilderfor intermediate or final durable outputCpuBuilderif you run map/reduce in tasklets (optional)grafos_batch::{TaskGraph, Executor, ExecutionPlan}for DAG-structured execution — sourcegrafos_jobs::{JobCoordinator, RetryPolicy}for idempotent retry — sourcegrafos_observefor lease count graphs
Related API docs:
Design
DAG Structure via TaskGraph
Model the MapReduce job as a directed acyclic graph using TaskGraph:
- map tasks form the first wave (no dependencies)
- reduce tasks depend on their input map tasks
- a final aggregation task depends on all reduce tasks
TaskGraph validates the DAG (no cycles, all dependencies exist) and produces an ExecutionPlan
with waves that can be executed in parallel.
Intermediate Storage Choice
If reducers need to read mapper outputs after mappers finish, use:
- block leases for partitions, or
- long-TTL memory leases (with renewal by coordinator)
The coordinator can “own” renewal: mapper writes partition into a lease owned by the job, not by the worker process.
Deterministic Partitioning
Partition function:
partition = hash(key) % num_partitions
Store partitions separately so reducers can pull their partition directly.
Cleanup Invariant
Intermediate partitions are leases. The “cleanup” is simply:
- drop the lease handles at the end
Walkthrough (Implementation Sketch)
1. Define the Task Graph
use grafos_batch::{TaskGraph, Executor};
let mut graph = TaskGraph::new();
// Wave 1: map taskslet map_a = graph.add_task(map_task_def("chunk_a"))?;let map_b = graph.add_task(map_task_def("chunk_b"))?;
// Wave 2: reduce tasks depend on mapperslet reduce = graph.add_task(reduce_task_def())?;graph.add_dependency(map_a, reduce)?;graph.add_dependency(map_b, reduce)?;
graph.validate()?;let plan = graph.build();2. Execute Waves
let executor = Executor::new();let result = executor.run(plan)?;
// Waves execute in order: all map tasks first, then reduce.// Within a wave, tasks run in parallel.for wave in plan.waves() { // Each wave contains independent tasks.}3. Retry Failed Tasks
Use JobCoordinator for retry logic on individual chunks:
use grafos_jobs::{JobCoordinator, RetryPolicy, Backoff};
let policy = RetryPolicy::default() .with_max_retries(3) .with_backoff(Backoff::exponential(100, 5000));
let mut coordinator = JobCoordinator::new(policy);let result = coordinator.run( map_chunks, partition_store, |chunk, store| { /* map one chunk */ Ok(()) }, |results| { /* aggregate */ Ok(results) },)?;4. Coordinator Drops Intermediate Leases
When the job completes (or the coordinator crashes), all intermediate leases are dropped. TTL expiry handles the crash case.
Failure Modes
- Mapper dies: coordinator retries mapper work; existing partial partitions can be discarded by dropping their leases.
- Reducer dies: retry reducer; partitions are still present (if stored in block leases).
- Coordinator dies: TTL expiry cleans up intermediate leases; re-run job from scratch or from durable checkpoints.
Observability
The “meta-proof” is a lease count graph:
- leases ramp up during map
- plateau during shuffle
- ramp down during reduce
- hit zero at end
This is a powerful operational story: you can assert “no leftovers” by observing leases.
Variations
- Add
Durablecheckpoints so coordinator can resume without recomputing map phase. - Use
FabricQueuefor streaming shuffle where possible.