Summary -

In this topic, we described about the Streaming along with example.

By default, Map Reduce framework written in Java language. The framework supports writing programs in java programming language. However, Hadoop provides API for writing MapReduce programs other than java language.

Hadoop streaming is utility comes up with the Hadoop distribution. Hadoop streaming allows users to write MapReduce programs in any programming/scripting language. Supported languages are Python, PHP, Ruby, Perl, bash etc.

Working: -

The Streaming utility will create a Map/Reduce job, submit the job to an appropriate cluster and monitor the progress of the job until it completes. When a script is specified for mappers, each mapper task will launch the script as a separate process when the mapper is initialized. Each Mapper task converts its input into lines and feed the converted lines to the standard input (stdin) of the process.

The mapper collects the line-oriented outputs from the standard output (STDOUT) of the process and converts each line into a key/value pair. The key/value pair is collected as the output of the mapper. The prefix of a line up to the first tab character is the key by default and the rest of the line will be the value.

If there is no tab character in the line, then the entire line is considered as the key and the value is null. The key pairs can be customized as per the specific requirements. These mapper output key-values pairs are fed to reducer tasks.

Each reducer task will launch the script as a separate process, then the reducer is initialized. As the reducer task runs, it converts its input key/values pairs into lines and feeds the lines to the standard input (STDIN) of the process.

Each reducer task converts its input key/values pairs into lines and converts into tab separated key-value pairs. The Reducer output is the final output of the MapReduce job.

Important Commands -

ParametersRequired/OptionalDescription
-input directory/file-nameRequiredInput location for mapper.
-output directory-nameRequiredOutput location for reducer.
-mapper executable or script or JavaClassNameRequiredMapper executable.
-reducer executable or script or JavaClassNameRequiredReducer executable.
-file file-nameOptionalMakes the mapper, reducer or combiner executable available locally on the compute nodes.
-inputformat JavaClassNameOptionalIf specified, it should return key, value pairs of Text class. If not specified TextInputFormat is used as the default.
-outputformat JavaClassNameOptionalIf specified, it should take key, value pairs of Text class. If not specified TextOutputformat is used as the default.
-partitioner JavaClassNameOptionalClass that specifies the reduce a key that is sent to.
-combiner streaming Command or JavaClassNameOptionalCombiner executable for map output.
-inputreaderOptionalSpecifies a record reader class instead of an input format class.
-verboseOptionalVerbose output.
-lazyOutputOptionalCreates output lazily.
-numReduceTasksOptionalSpecifies the number of reducers.
-mapdebugOptionalScript to call when map task fails.
-reducedebugOptionalScript to call when reduction makes the task failure
-cmdenv name=valueOptionalPasses the environment variable to streaming commands.

Example Using Python -

Below example describes the word count program in python language with mapper phase and reducer phase code.

Mapper Phase code: -

!/usr/bin/env python
import sys
 
#--- get all lines from stdin ---
for line in sys.stdin:
    #--- remove leading and trailing whitespace---
    line = line.strip()

    #--- split the line into words ---
    words = line.split()

    #--- output tuples [word, 1] in tab-delimited format---
    for word in words: 
        print '%s\t%s' % (word, "1")

Make sure the above program executing with the permission:

chmod +x /Hadoop_path/mapper.py

Reducer Phase code: -

#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {} 
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
 
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        continue

    try:
        word2count[word] = word2count[word]+count
    except:
        word2count[word] = count
 
# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
    print '%s\t%s'% ( word, word2count[word] )

Make sure the above program executing with the permission:

chmod +x /Hadoop_path/reducer.py

Execution of WordCount Program: -

$hadoop_home_path/hadoop jar  hadoop_streaming_path
/hadoop_streaming.jar \
   -input input_dir \ 
   -output output_dir \ 
   -mapper <path/mapper.py> \ 
   -reducer <path/reducer.py>

Where "\" is used for line continuation for clear readability.