Summary -

In this topic, we described about the Partitioners with detailed example.

Partitioner allows distributing how outputs from the map stage are send to the reducers. Partitioner controls the keys partition of the intermediate map-outputs. The key or a subset of the key is used to derive the partition by a hash function.

The total number of partitions is almost same as the number of reduce tasks for the job. Partitioner runs on the same machine where the mapper had completed its execution by consuming the mapper output. Entire mapper output sent to partitioner.

Partitioner forms number of reduce task groups from the mapper output. By default, Hadoop framework is hash based partitioner. The Hash partitioner partitions the key space by using the hash code.

Default hash partitioner syntax -

public int getPartition(K key, V value,
   int numReduceTasks) {
	     return(key.hashCode()
 	& Integer.MAX_VALUE) % numReduceTasks;
}

Hadoop allows making the partitioner as a custom partitioner. The Patitioner abstract class with a single method used for the Partitioner in Hadoop. Patitioner abstract class with a single method can be extended to write the custom partitioner.

public abstract class Partitioner<KEY, VALUE> {
	  public abstract int getPartition
	(KEY key, VALUE value, int numPartitions){
		    return;
		  
	}
}

Parameters -

  • key - the key to be partioned.
  • value - the entry value.
  • numPartitions - the total number of partitions.

Uses -

  • Custom partitioner.
  • Optimize the mapreduce programs and executed the given problem as well as possible.
  • Can divide the output files how it required.
Note! Partitioner in mapreduce can be used only on required situations. There is no need to use custom partitioner in every program.

Example: - Word count example with custom partitioner

Input - aa bb cc dd ee aa ff bb cc dd ee ff

Save the input as input.txt and place it in the Hadoop library.

$vim input.txt
aa bb cc dd ee aa ff bb cc dd ee ff
$hadoop fs -mkdir -p /user/user-name/wc/input
$hadoop fs -put input.txt /user/user-name/wc/input/

Program:

package org.example.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordcountExample {
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
			System.out.println("Usage: [input] [output]");
			System.exit(-1);
		}
		Job jobex = jobex.getInstance(getConf());
		jobex.setJobName("wordcount");
		jobex.setJarByClass(WordcountDriver.class);
		jobex.setOutputKeyClass(Text.class);
		jobex.setOutputValueClass(IntWritable.class);
		jobex.setMapperClass(WordcountExampleMapper.class);
		jobex.setCombinerClass(WordcountExampleReducer.class);
        jobex.setPartitionerClass(WordcountExamplepartitioner.class);
        jobex.setNumReduceTasks(2);
		jobex.setReducerClass(WordcountExampleReducer.class);
		jobex.setInputFormatClass(TextInputFormat.class);
		jobex.setOutputFormatClass(TextOutputFormat.class);
		Path inputFilePath = new Path(args[0]);
		Path outputFilePath = new Path(args[1]);
		/* This line is to accept the input recursively */
		FileInputFormat.setInputDirRecursive(job, true);
		FileInputFormat.addInputPath(job, inputFilePath);
		FileOutputFormat.setOutputPath(job, outputFilePath);
		/*  Delete output filepath if already exists */
		FileSystem fs = FileSystem.newInstance(getConf());
		if (fs.exists(outputFilePath)) {
			fs.delete(outputFilePath, true);
		}
		return jobex.waitForCompletion(true) ? 0: 1;
	}
}
public class WordcountExampleMapper extends Mapper<LongWritable, 
								Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();
	public void map(LongWritable key, Text value, Context context) 
				throws IOException, InterruptedException {
		String line = value.toString();
		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());
			context.write(word, one);
		}
	}
}
public class WordcountExamplepartitioner extends 
				Partitioner<Text, IntWritable> {
     String partititonkey;
     public int getPartition(Text key, IntWritable value,
     								 int numPartitions) {
         // TODO Auto-generated method stub         
         if(numPartitions == 2){
             String partitionKey = key.toString();
             if(partitionKey.charAt(0) > 'b' )
                 return 0;
             else 
                 return 1;
         } else if(numPartitions == 1)
             return 0;
         else{
             System.err.println("WordCountExampleParitioner 
             can only handle either 1 or 2 paritions");
             return 0;
         }
     }
}	
public class WordcountExampleReducer extends
		Reducer<Text, IntWritable, Text, IntWritable> {
	private IntWritable totalWordCount = new IntWritable();
	public void reduce(final Text key, final 
	Iterable<IntWritable> values,
			final Context context) throws IOException,
								 InterruptedException {
		int totalcount = 0;
		Iterator<IntWritable> iterator = values.iterator();
		while (iterator.hasNext()) {
			totalcount += iterator.next().get();
		}
		totalWordCount.set(totalcount);
		context.write(key, totalWordCount);
	}
}

Expected Result:

aa 	2
bb	2
cc 	2
dd	2
ee	2
ff	2