mapreduce-paper-read-en

Jeffrey Dean @ Google and Sanjay Ghemawat @ Google

Motivation

Google’s workload means input data is often huge; computation must be spread across hundreds or thousands of machines to finish in reasonable time. How to parallelize, partition data, and handle failures becomes central.

To address this, the authors abstract a simple computation model while hiding parallelism, fault tolerance, partitioning, and load balancing. The programming model is inspired by Lisp’s map and reduce.

Programming model

Computation takes a set of input key/value pairs and produces a set of output key/value pairs.

The MapReduce library expresses this with user-defined Map and Reduce functions.

Map

The user-written Map function consumes an input pair and emits a set of intermediate key/value pairs. The MapReduce library groups all intermediate values by key and passes them to Reduce.

Reduce

The user-written Reduce function accepts an intermediate key and a list of values for that key. It merges those values into a smaller set—often zero or one output value per call. Values are supplied via an iterator so the list can exceed memory.

Implementation

Map invocations are distributed across many machines. Input is split into M pieces processed in parallel. Intermediate keys are partitioned into R regions consumed by Reduce; R is a user-chosen parameter.

  • The library first splits input into M pieces (often 16–64 MB each). Many copies of the program are started on a cluster.
  • One copy is special—the master. The rest are workers assigned work by the master. There are M map tasks and R reduce tasks. The master assigns idle workers a map or reduce task.
  • A worker running map reads its split, parses key/value pairs, and passes them to the user’s Map function. Intermediate pairs are buffered in memory.
  • Periodically, buffered pairs are written to local disk, partitioned into R regions by a partitioning function. Locations of these buffered pairs are sent back to the master, which forwards them to reduce workers.
  • When a reduce worker is notified, it reads buffered data from map workers’ disks via RPC. After reading all intermediate data for its partition, it sorts by intermediate key so all values for a key are contiguous. Sorting is needed because many keys map to the same reduce task. External sort is used if data does not fit in memory.
  • The reduce worker walks the sorted data; for each unique key it passes the key and value list to the user’s Reduce function. Reduce output is appended to a final output file for this partition.
  • When all map and reduce tasks finish, the master wakes the user program and the MapReduce call returns.

On success, output lives in R files (one per reduce task, filenames user-specified). Users usually feed these files into another MapReduce job or another distributed app that accepts partitioned input rather than merging into one file.

Master data structures

The master tracks state (idle, in-progress, completed) and worker identity for each map/reduce task.

It is also the conduit from map outputs to reduce tasks: for each completed map task it stores the R intermediate file regions’ locations and sizes. Updates arrive when map tasks finish and are pushed incrementally to reduce workers that need them.

Fault tolerance

MapReduce targets hundreds or thousands of machines, so it must tolerate failures gracefully.

Worker failure

The master pings workers periodically. If a worker does not respond within a timeout, it is marked failed.

Any completed map tasks on that worker are reset to idle and become eligible for reschedule—map output lived on the failed machine’s local disk and is unreachable. Completed reduce tasks need not rerun; output is in the global filesystem.

If map task A’s worker fails and the task is re-run on B, reduce workers that have not yet read A’s output read from B instead.

MapReduce tolerates large-scale worker loss: e.g. during one job, network maintenance took ~80 machines offline for minutes; the master simply re-executed their work and the job completed.

Master failure

The master can checkpoint its data structures periodically and resume from the last checkpoint. With a single master, failure is deemed rare; the described implementation aborts the job if the master dies—the client may retry.

Semantics under failures

When Map and Reduce are deterministic functions of their inputs, distributed execution matches the output of a sequential fault-free run.

This relies on atomic commit of task outputs: in-flight tasks write to private temporary files. A reduce task writes one such file; a map task writes R files (one per reduce partition). On map completion, the worker sends the R temporary filenames to the master; duplicate completion messages for the same map are ignored, otherwise the names are recorded.

When reduce completes, the worker renames its temp file to the final name. If the same reduce runs on multiple machines, multiple renames target the same final file; the filesystem’s atomic rename ensures the final state reflects exactly one successful reduce.

Most operators are deterministic, so reasoning matches sequential execution. With non-determinism, output of reduce partition R1 matches some sequential execution for R1, but different reduces may correspond to different sequential orderings—still a reasonable weaker guarantee.

Locality

Network bandwidth is scarce. Input is stored in GFS on cluster machines; GFS splits files into 64 MB blocks with ~3 replicas on different machines. The master tries to schedule map tasks on machines that hold a replica of the input split; otherwise it picks a worker near the data (e.g. same switch). For large jobs, most input is read locally, saving bandwidth.

Granularity of tasks

The authors routinely run jobs with ~2000 workers, M ≈ 200,000, R ≈ 5,000.

Backup tasks

Stragglers—a few tasks taking far longer—often dominate tail latency (slow disk, CPU contention, bad init code disabling caches, etc.).

Near job end, the master schedules backup executions of still-running tasks. The task is considered done when either primary or backup finishes. This adds only a few percent extra compute but cuts large-job completion time substantially.

Refinements

Partition function

Users choose R and a partition function on intermediate keys. Default is hash-based (hash(key) mod R) for balance. Sometimes keys are URLs and all URLs for one host should land in one file—e.g. hash(Hostname(urlkey)) mod R. Users can supply custom partitioners.

Ordering

Within a partition, intermediate key/value pairs are processed in increasing key order, enabling sorted output files useful for random access or downstream convenience.

Combiner

When a map emits many repeated intermediate keys and Reduce is associative and commutative (e.g. word counts), a combiner can partially merge map-side before network transfer—often the same code as Reduce, but output goes to intermediate files for reduce rather than final output.

Input/output types

The library supports multiple input formats.

Side effects

Users sometimes emit auxiliary files from Map/Reduce. The library expects app writers to make side effects atomic and idempotent (write temp, rename when complete). Atomic two-phase commit across multiple output files from one task is not supported; such tasks should be deterministic—in practice this rarely matters.

Skipping bad records

Buggy user code may deterministically crash on certain records and block the whole job. An optional mode detects offending records (signal handlers + “last gasp” UDP with record sequence to the master) and skips them after repeated failures—acceptable for some analytics workloads.

Local execution

Debugging distributed jobs is hard. An alternate implementation runs MapReduce sequentially on one machine for debugging and small tests, with flags to restrict to specific map tasks and use gdb etc.

Status pages

The master serves HTTP status: progress counts, bytes, rates, links to stderr/stdout per task—useful for ETA, scaling decisions, and diagnosing failures.