Ruby

The map function can be expressed in Ruby as shown in Example 2-7.

Example 2-7. Map function for maximum temperature in Ruby

#!/usr/bin/env ruby
STDIN.each_line do |line|
    val = line
    year, temp, q = val[15,4], val[87,5], val[92,1]
    puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end

The program iterates over lines from standard input by executing a block for each line from STDIN (a global constant of type IO). The block pulls out the relevant fields from each input line and, if the temperature is valid, writes the year and the temperature separated by a tab character, \t, to standard output (using puts).

NOTE
It’s worth drawing out a design difference between Streaming and the Java MapReduce API. The Java API is geared toward processing your map function one record at a time. The framework calls the map() method on your Mapper for each record in the input, whereas with Streaming the map program can decide how to process the input — for example, it could easily read and process multiple lines at a time since it’s in control of the reading. The user’s Java map implementation is “pushed” records, but it’s still possible to consider multiple lines at a time by accumulating previous lines in an instance variable in the Mapper.[23] In this case, you need to implement the close() method so that you know when the last record has been read, so you can finish processing the last group of lines.

Because the script just operates on standard input and output, it’s trivial to test the script without using Hadoop, simply by using Unix pipes:

% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078

The reduce function shown in Example 2-8 is a little more complex.

Example 2-8. Reduce function for maximum temperature in Ruby

#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
    key, val = line.split("\t")
    if last_key && last_key != key
        puts "#{last_key}\t#{max_val}"
        last_key, max_val = key, val.to_i
    else
        last_key, max_val = key, [max_val, val.to_i].max
    end
end
puts "#{last_key}\t#{max_val}" if last_key

Again, the program iterates over lines from standard input, but this time we have to store some state as we process each key group. In this case, the keys are the years, and we store the last key seen and the maximum temperature seen so far for that key. The MapReduce framework ensures that the keys are ordered, so we know that if a key is different from the previous one, we have moved into a new key group. In contrast to the Java API, where you are provided an iterator over each key group, in Streaming you have to find key group boundaries in your program.

For each line, we pull out the key and value. Then, if we’ve just finished a group (last_key && last_key != key), we write the key and the maximum temperature for that group, separated by a tab character, before resetting the maximum temperature for the new key. If we haven’t just finished a group, we just update the maximum temperature for the current key.

The last line of the program ensures that a line is written for the last key group in the input.

We can now simulate the whole MapReduce pipeline with a Unix pipeline (which is equivalent to the Unix pipeline shown in Figure 2-1):

% cat input/ncdc/sample.txt | \
    ch02-mr-intro/src/main/ruby/max_temperature_map.rb | \
    sort | ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
1949 111
1950 22

The output is the same as that of the Java program, so the next step is to run it using Hadoop itself.

The hadoop command doesn’t support a Streaming option; instead, you specify the Streaming JAR file along with the jar option. Options to the Streaming program specify the input and output paths and the map and reduce scripts. This is what it looks like:

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
    -input input/ncdc/sample.txt \
    -output output \
    -mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
    -reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

When running on a large dataset on a cluster, we should use the -combiner option to set the combiner:

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\
ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-input input/ncdc/all \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\
ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-input input/ncdc/all \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

Note also the use of -files, which we use when running Streaming programs on the cluster to ship the scripts to the cluster.

results matching ""

    No results matching ""