Combiner Functions

Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. Hadoop allows the user to specify a combiner function to be run on the map output, and the combiner function’s output forms the input to the reduce function. Because the combiner function is an optimization, Hadoop does not provide a guarantee of how many times it will call it for a particular map output record, if at all. In other words, calling the combiner function zero, one, or many times should produce the same output from the reducer.

Figure 2-5. MapReduce data flow with no reduce tasks

Figure 2-5. MapReduce data flow with no reduce tasks

The contract for the combiner function constrains the type of function that may be used. This is best illustrated with an example. Suppose that for the maximum temperature example, readings for the year 1950 were processed by two maps (because they were in different splits). Imagine the first map produced the output:

(1950, 0)
(1950, 20)
(1950, 10)

and the second produced:

(1950, 25)
(1950, 15)

The reduce function would be called with a list of all the values:

(1950, [0, 20, 10, 25, 15])

with output:

(1950, 25)

since 25 is the maximum value in the list. We could use a combiner function that, just like the reduce function, finds the maximum temperature for each map output. The reduce function would then be called with:

(1950, [20, 25])

and would produce the same output as before. More succinctly, we may express the function calls on the temperature values in this case as follows: max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25 Not all functions possess this property.[20] For example, if we were calculating mean temperatures, we couldn’t use the mean as our combiner function, because:

mean(0, 20, 10, 25, 15) = 14

but:

mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15

The combiner function doesn’t replace the reduce function. (How could it? The reduce function is still needed to process records with the same key from different maps.) But it can help cut down the amount of data shuffled between the mappers and the reducers, and for this reason alone it is always worth considering whether you can use a combiner function in your MapReduce job.

Specifying a combiner function

Going back to the Java MapReduce program, the combiner function is defined using the Reducer class, and for this application, it is the same implementation as the reduce function in MaxTemperatureReducer. The only change we need to make is to set the combiner class on the Job (see Example 2-6).

Example 2-6. Application to find the maximum temperature, using a combiner function for efficiency

public class MaxTemperatureWithCombiner {
     public static void main(String[] args) throws Exception {
         if (args.length != 2) {
             System.err.println("Usage: MaxTemperatureWithCombiner <input path> " + "<output path>");
         System.exit(-1);
         }
         Job job = new Job();
         job.setJarByClass(MaxTemperatureWithCombiner.class);
         job.setJobName("Max temperature");
         FileInputFormat.addInputPath(job, new Path(args[0]));
         FileOutputFormat.setOutputPath(job, new Path(args[1]));
         job.setMapperClass(MaxTemperatureMapper.class);
         job.setCombinerClass(MaxTemperatureReducer.class);
         job.setReducerClass(MaxTemperatureReducer.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);
         System.exit(job.waitForCompletion(true) ? 0 : 1);
     }
}

results matching ""

    No results matching ""