Skip to content

Recipe 10: A MapReduce Job That Finishes With Zero Cleanup

Situation

Batch jobs allocate a lot of temporary resources:

  • shuffle partitions
  • intermediate outputs
  • worker scratch space

In many systems, cleanup is an operational afterthought:

  • orphaned workers
  • leftover temp files
  • long GC tails

In a lease-oriented system, you want a stronger invariant:

Resources exist iff the job is actively holding leases for them.

If the job completes, leases drop. If the job crashes, leases expire. Either way, intermediate artifacts disappear.

What You Build

A MapReduce-like flow:

  • mappers lease memory for input buffers and intermediate partitions
  • shuffle uses leased memory or block storage for partition exchange
  • reducers lease memory for aggregation and write final output to block storage

All intermediate state is leased.

Building Blocks

  • MemBuilder / MemLease for intermediate memory
  • BlockBuilder for intermediate or final durable output
  • CpuBuilder if you run map/reduce in tasklets (optional)
  • grafos_batch::{TaskGraph, Executor, ExecutionPlan} for DAG-structured execution — source
  • grafos_jobs::{JobCoordinator, RetryPolicy} for idempotent retry — source
  • grafos_observe for lease count graphs

Related API docs:

Design

DAG Structure via TaskGraph

Model the MapReduce job as a directed acyclic graph using TaskGraph:

  • map tasks form the first wave (no dependencies)
  • reduce tasks depend on their input map tasks
  • a final aggregation task depends on all reduce tasks

TaskGraph validates the DAG (no cycles, all dependencies exist) and produces an ExecutionPlan with waves that can be executed in parallel.

Intermediate Storage Choice

If reducers need to read mapper outputs after mappers finish, use:

  • block leases for partitions, or
  • long-TTL memory leases (with renewal by coordinator)

The coordinator can “own” renewal: mapper writes partition into a lease owned by the job, not by the worker process.

Deterministic Partitioning

Partition function:

  • partition = hash(key) % num_partitions

Store partitions separately so reducers can pull their partition directly.

Cleanup Invariant

Intermediate partitions are leases. The “cleanup” is simply:

  • drop the lease handles at the end

Walkthrough (Implementation Sketch)

1. Define the Task Graph

use grafos_batch::{TaskGraph, Executor};
let mut graph = TaskGraph::new();
// Wave 1: map tasks
let map_a = graph.add_task(map_task_def("chunk_a"))?;
let map_b = graph.add_task(map_task_def("chunk_b"))?;
// Wave 2: reduce tasks depend on mappers
let reduce = graph.add_task(reduce_task_def())?;
graph.add_dependency(map_a, reduce)?;
graph.add_dependency(map_b, reduce)?;
graph.validate()?;
let plan = graph.build();

2. Execute Waves

let executor = Executor::new();
let result = executor.run(plan)?;
// Waves execute in order: all map tasks first, then reduce.
// Within a wave, tasks run in parallel.
for wave in plan.waves() {
// Each wave contains independent tasks.
}

3. Retry Failed Tasks

Use JobCoordinator for retry logic on individual chunks:

use grafos_jobs::{JobCoordinator, RetryPolicy, Backoff};
let policy = RetryPolicy::default()
.with_max_retries(3)
.with_backoff(Backoff::exponential(100, 5000));
let mut coordinator = JobCoordinator::new(policy);
let result = coordinator.run(
map_chunks,
partition_store,
|chunk, store| { /* map one chunk */ Ok(()) },
|results| { /* aggregate */ Ok(results) },
)?;

4. Coordinator Drops Intermediate Leases

When the job completes (or the coordinator crashes), all intermediate leases are dropped. TTL expiry handles the crash case.

Failure Modes

  • Mapper dies: coordinator retries mapper work; existing partial partitions can be discarded by dropping their leases.
  • Reducer dies: retry reducer; partitions are still present (if stored in block leases).
  • Coordinator dies: TTL expiry cleans up intermediate leases; re-run job from scratch or from durable checkpoints.

Observability

The “meta-proof” is a lease count graph:

  • leases ramp up during map
  • plateau during shuffle
  • ramp down during reduce
  • hit zero at end

This is a powerful operational story: you can assert “no leftovers” by observing leases.

Variations

  • Add Durable checkpoints so coordinator can resume without recomputing map phase.
  • Use FabricQueue for streaming shuffle where possible.