In this hadoop tutorial we will have a look at the modification to our previous program wordcount with our own custom mapper and reducer by implementing a concept called as custom record reader. Before we attack the problem let us look at some theory required to understand the topic.
So far in the series of articles we have seen how to create a mapreduce program without writing explicit mapper or reducer also in the second part we wrote the wordcount with our own custom mapper and reducer
(Input format theory information reference from yahoo tutorial )
The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats.
More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.
Another important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called “splits” and are encapsulated in instances of the InputSplit interface. Most files, for example, are split up on the boundaries of the underlying blocks in HDFS, and are represented by instances of the FileInputSplitclass. Other files may be unsplittable, depending on application-specific data. Dividing up other data sources (e.g., tables from a database) into splits would be performed in a different, application-specific fashion. When dividing the data into input splits, it is important that this process be quick and cheap. The data itself should not need to be accessed to perform this process (as it is all done by a single machine at the start of the MapReduce job).
So in nutshell InputFormat does 2 tasks :
- Divide the data source ( the data files ) into fragments or blocks which are sent to a mapper. These are called splits.
- These splits are further divided into records and these records are provided one at a time to the mapper for processing. This is achieved through a class called as Record Reader
We will concentrate on customizing #2 above customizing #1 will be left for one of the next articles. By customizing record reader as in #2 above we get immense power of sending any kind of records / xml sections / JSON objects to the mapper after reading it from the source text files
Okey. Now that we understand how mapper is fed data from source files lets look at what we will try to achieve in the example program in this article.
Problem : We want our mapper to receive 3 records ( 3 lines ) from the source file at a time instead on 1 line as provided by default by the TextInputFormat.
Approach :
- We will extend from TextInputFormat class to create our own NLinesInputFormat .
- We will also create our own RecordReader class called NLinesRecordReader where we will implement the logic of feeding 3 lines/records at a time.
- We will make a change in our driver program to use our new NLinesInputFormat class.
- To prove that we are really getting 3 lines at a time, instead of actually counting words ( which we already know now how to do ) , we will emit out number of lines we get in the input at a time as a key and 1 as a value , which after going through reducer will give us frequency of each unique number of lines to the mappers.
Example :
Step 1 : Creating NLinesInputFormat class as a custom inputformat class.
This is really straightforward, we will inherit our class from TextInputFormat and override createInputFormat( ) function to use our custom record reader class NLinesRecordReader which we will soon write. The sourcelisting for this follows :
public class NLinesInputFormat extends TextInputFormat{ @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { return new NLinesRecordReader(); } }
Now that we have our new inputformat ready lets look at creating custom record reader. This is little complicated and the source code is a modified version of hadoop’s own LineInputFormat.
Step 2: Creating NLinesRecordReader class as a custom RecordReader class.
We will inherit from RecordReader class. RecordReader has 6 abstract methods which we will have to implement.
- close ( )
- getCurrentKey ( )
- getCurrentValue ( )
- getProgress ( )
- initialize ( )
- nextKeyValue ( )
The most important ones for our discussion are the initialize and nextKeyvalue functions which we will override. The initialize function will be called only once for each split so we will do setup in this function and the nextKeyValue function is called for providing records, here we will write logic so that we send 3 records in the value instead of default 1. Here is the source listing for the class :
public class NLinesRecordReader extends RecordReader<LongWritable, Text>{ private final int NLINESTOPROCESS = 3; private LineReader in; private LongWritable key; private Text value = new Text(); private long start =0; private long end =0; private long pos =0; private int maxLineLength; @Override public void close() throws IOException { if (in != null) { in.close(); } } @Override public LongWritable getCurrentKey() throws IOException,InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException { FileSplit split = (FileSplit) genericSplit; final Path file = split.getPath(); Configuration conf = context.getConfiguration(); this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE); FileSystem fs = file.getFileSystem(conf); start = split.getStart(); end= start + split.getLength(); boolean skipFirstLine = false; FSDataInputStream filein = fs.open(split.getPath()); if (start != 0){ skipFirstLine = true; --start; filein.seek(start); } in = new LineReader(filein,conf); if(skipFirstLine){ start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } value.clear(); final Text endline = new Text("\n"); int newSize = 0; for(int i=0;i<NLINESTOPROCESS;i++){ Text v = new Text(); while (pos < end) { newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength)); value.append(v.getBytes(),0, v.getLength()); value.append(endline.getBytes(),0, endline.getLength()); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } } } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } }
Step 3 : Change in driver to use new Inputformat
Now that we have the custom record reader ready lets modify our driver to use the new input format by adding following line of code
job.setInputFormatClass(NLinesInputFormat.class);
Step 4 : Change the mapper to emit number of lines it gets each time
Here is the listing; its pretty self explanatory. I am only putting listing of map function here for the listing here.
public void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException { String lines = value.toString(); String []lineArr = lines.split("\n"); int lcount = lineArr.length; context.write(new Text(new Integer(lcount).toString()),new IntWritable(1)); }
Sample Data Input :
I have used sample data input files of 10000 lines of following format
Shantanu , Deo
Suruchi, Bhide
Shamika, Deo
…
Mujtaba, Ahmed
Sample Output from Reducer:
1 1
3 3333
This is because our mapper got 3333 records of 3 lines each and 1 last record of 1 line.
I hope you understood the article. If you liked it please feel free to share this . Also comment.
Thanks and have great day!
Very helpful tutorial.I have a couple of questions. Line 80-82 is to ignore empty lines, correct?
Line 89-92 is what I don’t understand.
LikeLike
if the input chunk is null, or empty, set everything else to null.
LikeLike
actually, scratch that.. if the next scanned line is empty, we reached end of chunk..
LikeLike
hello,I run the program above , I find that it can not output correct result like your output? can you explain it ? thanks
LikeLike
can u give some instructions? thank you very much My e-mail [email protected]
LikeLike
Interesting article, actually similar to what I’ve published today ^^
In case a line spans over 2 distinct Splits, line will be read in Split(i) and will be skipped in Split(i+1). In your implementation, X lines will be read from Split(i), but only 1 line will be skipped from Split(i+1), meaning that you’d probably end up with some duplicates…
Small example with 8 Lines spanned over 2 Splits
——
Line1
Line2
Line3
Line4
——
Line5
Line6
Line7
Line8
——
Will result on
Line 1,Line2,Line3
Line 4,Line5,Line6
Line 6,Line7,Line8
We get a duplicate Line 6 here.. Do you share same point of view ?
Should you need further details, refer to my article with some example of block boundaries.
http://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit/
There should be a way to tell Split(i+1) that first X lines have been read and therefore must be ignored…
LikeLike
Sorry I was not right with the issue I previously described.
Thanks to line #76 (pos < end), Hadoop makes sure a record that starts after end of current Split is not processed. Taking same example as before, we get
Lines 1,2,3 from Reader#1
Line 4 from Reader#1
Lines 5,6,7 from Reader#2
…/…
OK we don't get duplicates. However some records won't be part of a 3-lines tuple (e.g. Line 4 here)
LikeLike
Reblogged this on Hadoopi.
LikeLiked by 1 person
Hello Sir
I have a question regarding Hadoop, Could you please help me ? I appreciate your help very much
I am trying to implement bottom up divide and conquer algorithm using Hadoop. In this implementation, I have a list of points as the input for the map functions. I would like to run several jobs in a way that first job divides the input points into groups at each group 2 points and then run reduce function on each group.
The second job takes the first job output as its input. The second job will divide the points into groups at each group 4 points and run the reduce function on them. and so on.
My question is, how I can control the grouping operation so the first job divides the points into 2 points groups, the second job divides the points into 4 points groups, the third job divides the points into 8 points group, and the last job divide the points in one group that contains all the points.
Thanks
Hussein
LikeLike
I think you get syntax wrong
job.setInputFormatClass(NLinesInputFormat.class); -> job.setInputFormat(NLinesInputFormat.class);
LikeLike
Hi,
Thanks for your article..its really helpful. But I am new to hadoop and I am unable to understand the code in initialize() and nextkeyvalue() method. Could you please explain what exactly are we doing in these two methods. I only got that we are initilizializing file split and seperating records as per logic. Could you please explain about the classes that you have used over there?
Thank you,
Vamshi
LikeLike
Hi, I’m new to hadoop and trying to implement the custom input through recordreader. I have the input file something like this
6273
5328
6238
2789
I want to read each line by LineRecordReader and do some process and the new inputs to mapper will be more than one like this
6273 -> 2312,6787,5864,9023
Just wondering if this is possible ? thanks in advance for your help.
LikeLike
How can I test my own custom record reader? any sample JUNit framework would be appreciable.
LikeLike
Thank you for this tutorial.
I am new in hadoop and I have one question.
Lines 50-54 is what I don’t understand.
If the first part of split begins with first byte of line (not split), do we skip it or not?
As I know from apache documents split.getStart() –return the position of the first byte in the file to process, it is not the first byte of line.
LikeLike
If split.getStart() > 0, you skip the entire record containing last byte of previous split as it has been already processed by previous split
LikeLike
Great article. Is there a way to read two lines cumulatively. For e.g.
Line 1
Line 2
Line 3
Line 4
I want them to be read as:
Line 1, Line 2
Line 2, Line 3
Line 3, Line 4
and so on.
Please let me know if you know a solution for the above.
Thanks
LikeLike
I want to have something like this:
Input file contains:
Line 1
Line 2
Line 3
Line 4
I want it to be read by the mapper like:
Line 1, Line 2
Line 2, Line 3
Line 3, Line 4
Is there any solution for this?
Thanks a lot!
LikeLike
Hi,
thanks for a helpful article! shouldn’t line 84 be >= ?
if (newSize < maxLineLength) {
break;
}
thanks for explanation,
Nune
LikeLike
Is it possible to customise the inputformat to read data from HDFS shell command e.g. hdfs dfs -cat file.txt. At the moment we have to pass the input folder/files but could we ‘cat’ data?
LikeLike
I appreciate your explanation and whole article..
LikeLike
can any provide a code base for customr record reader to read pdf files
LikeLike