Bigdata and HadoopIntroduction to Map-Reduce Programming model

Introduction to Map-Reduce Programming model

Introduction-to-Map-Reduce-Programming-model-reviewed-740X296

(Assuming you have basic working knowledge of Java)

MapReduce programming paradigm is based on the concept of key-value pairs. It also provides powerful paradigms for parallel data processing. For processing data in MapReduce, you need to be able to map a given input, and expected output into the MapReduce paradigm, that is both Input and Output needs to be mapped into the format of multiple key-value pairs. A single key value pair is also referred to as a record.

For example, you have a text file ‘input.txt’ with 100 lines of text in it, and you want to find out the frequency of occurrence of each word in the file. Each line in the input.txt file is considered as a value and the offset of the line from the start of the file is considered as a key, here (offset, line) is an input key-value pair. For counting how many times a word occurred (frequency of word) in the input.txt, a single word is considered as an output key and a frequency of a word is considered as an output value. Our input key-value is (offset of a line, line) and output key-value is (word, frequency of word).

A Map-Reduce job is divided into four simple phases, 1. Map phase, 2. Combine phase, 3. Shuffle phase, and 4. Reduce phase. In our example of word count, Combine and Reduce phase perform same operation of aggregating word frequency. Now, let’s look at how each phase is implemented using a sample code. Each phase takes a key-value as an input and emits one or more key-value pairs as an output. Generally in the Map phase you explode the input records, from one input key-value pair you create one or more output key-value pairs. In Reduce and Combine phases, you reduce input key-value pairs into less number of key-value pairs.

1. Map phase.
Map function operates on a single record at a time. On each input of key-value pair (LongWritable key, Text value) MapReduce framework will call map function with key and value as arguments, as shown below. Here, the value is a complete line of text. To count the words in a line, first you need to split the line into words, to do so you can use a java string tokenizer or create your own function. Output from this phase is a key-value pair, where a word is a key and frequency 1 is a value. Output key value is emitted using context.write(word, one). LongWritable, Text, and IntWritable are wrappers for corresponding basic datatypes in Java. These wrappers are provided by MapReduce framework to handle serialization and deserialization of key-value records.

public static class LineToWordMapper
extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

Example of input key-value pair to map function:
<0, “this is an introduction to map reduce programming. map, reduce and combine are the phases of map reduce programming”>

Example of output key-value pairs from map function:

<this, 1>
<is, 1>
<a, 1>
<introduction, 1>
<to, 1>
<map, 1>
<reduce, 1>
<programming, 1>
<map, 1>
<reduce, 1>
<and, 1>
<combine, 1>
<are, 1>
<the, 1>
<phases, 1>
<of, 1>
<map, 1>
<reduce, 1>
<programming, 1>

2. Combine phase.
The combiner is the process of applying a reducer logic early on an output from a single map process. Mappers output is collected into an in memory buffer. MapReduce framework sorts this buffer and executes the commoner on it, if you have provided one. Combiner output is written to the disk. For combiner code, please refer to Reducer code.

3. Shuffle phase.
In the shuffle phase, MapReduce partitions data and sends it to a reducer. Each mapper sends a partition to each reducer. Partitions are created by a Partitioner provided by the MapReduce framework. For each key-value pair, the Partitioner decides which reducer it needs to send. All the records for a same key are sent to a single reducer.

4. Reduce phase.
During initialization of the reduce phase, each reducer copies its input partition from the output of each mapper. After copying all parts, the reducer first merges these parts and sorts all input records by key. In the Reduce phase, a reduce function is executed only once for each key found in the sorted output. MapReduce framework collects all the values of a key and creates a list of values. The Reduce function is executed on this list of values and a corresponding key. In the example below, the reduce function for all the input frequencies for a word are added together to create a single result frequency. This result frequency is the total frequency of a word in an input of all documents. Notice that all the records for a key are sent to a single reducer, so only one reducer will output are frequency for a given word. The same word won’t be present in the output of the other reducers.

public static class FrequencyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

Example output of Reduce Phase:

<this, 1>
<is, 1>
<a, 1>
<introduction, 1>
<to, 1>
<map, 3>
<reduce, 3>
<programming, 2>
<and, 1>
<combine, 1>
<are, 1>
<the, 1>
<phases, 1>
<of, 1>

Java Programming Course for Beginner From Scratch

Putting it all together into a Map-Reduce Job:
Here is the complete java code for a word count MapReduce job. Notice, how we are instructing to use the same reducer function as combiner and reducer by setting job.setCombiner- Class and job.setReducerClass to FrequencyReducer.class. The Mappers’ input key and value type are decided by which input format you are using. Here, we are using job.setInputFormat(TextInputFormat. class), which reads a record of(LongWritable key, Text value) at a time. TextInput- Format is a reader provided by MapReduce framework to read text files in HDFS, similarly TextOutputFormat is a writer provided by MapReduce framework to write text files into HDFS.

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyWordCountJob {
//Mapper class to read one line of text at a time and emit key value pairs
public static class LineToWordMapper
extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class FrequencyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int freq = 0;
for (IntWritable val : values) {
freq += val.get();
}
result.set(freq);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "MyWordCountJob");
job.setJarByClass(MyWordCountJob.class);
job.setMapperClass(LineToWordMapper.class);
job.setCombinerClass(FrequencyReducer.class);
job.setReducerClass(FrequencyReducer.class);
// set input and output formats
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
// set output key and value class
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Exclusive content

- Advertisement -

Latest article

21,501FansLike
4,106FollowersFollow
106,000SubscribersSubscribe

More article

- Advertisement -