Hadoop Tutorial : Custom Record Reader with TextInputFormat

In this hadoop tutorial we will have a look at the modification to our previous program wordcount with our own custom mapper and reducer by implementing a concept called as custom record reader. Before we attack the problem let us look at some theory required to understand the topic.

So far in the series of articles we have seen how to create a mapreduce program without writing explicit mapper or reducer also in the second part we wrote the wordcount with our own custom mapper and reducer

(Input format theory information reference from yahoo tutorial )

The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats.

More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.

Another important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called “splits” and are encapsulated in instances of the InputSplit interface. Most files, for example, are split up on the boundaries of the underlying blocks in HDFS, and are represented by instances of the FileInputSplitclass. Other files may be unsplittable, depending on application-specific data. Dividing up other data sources (e.g., tables from a database) into splits would be performed in a different, application-specific fashion. When dividing the data into input splits, it is important that this process be quick and cheap. The data itself should not need to be accessed to perform this process (as it is all done by a single machine at the start of the MapReduce job).

So in nutshell InputFormat does 2 tasks :

  1. Divide the data source ( the data files ) into fragments or blocks which are sent to a mapper. These are called splits.
  2. These splits are further divided into records and these records are provided one at a time to the mapper for processing. This is achieved through a class called as Record Reader

We will concentrate on customizing #2 above customizing #1 will be left for one of the next articles. By customizing record reader as in #2 above we get immense power of sending any kind of records / xml sections / JSON objects to the mapper after reading it from the source text files

Okey. Now that we understand how mapper is fed data from source files lets look at what we will try to achieve in the example program in this article.

Problem : We want our mapper to receive 3 records ( 3 lines ) from the source file at a time instead on 1 line as provided by default by the TextInputFormat.

Approach :

  1. We will extend from  TextInputFormat class to create our own NLinesInputFormat .
  2. We will also create our own RecordReader class called NLinesRecordReader where we will implement the logic of feeding 3 lines/records at a time.
  3. We will make a change in our driver program to use our new NLinesInputFormat class.
  4. To prove that we are really getting 3 lines at a time, instead of actually counting words ( which we already know now how to do ) , we will emit out number of lines we get in the input at a time as a key and 1 as a value , which after going through reducer will give us frequency of  each unique number of lines to the mappers.

Example :

Step 1 : Creating NLinesInputFormat class as  a custom inputformat class.

This is really straightforward, we will inherit our class from TextInputFormat and override createInputFormat( ) function to use our custom record reader class NLinesRecordReader which we will soon write.   The sourcelisting for this follows :

public class NLinesInputFormat extends TextInputFormat{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
        return new NLinesRecordReader();
    }
}

Now that we have our new inputformat ready lets look at creating custom record reader. This is little complicated and the source code is a modified version of hadoop’s own LineInputFormat.

Step 2:  Creating NLinesRecordReader class as a custom RecordReader class.

We will inherit from RecordReader class. RecordReader has 6 abstract methods which we will have to implement.

  • close ( )
  • getCurrentKey ( )
  • getCurrentValue ( )
  • getProgress ( )
  • initialize ( )
  • nextKeyValue ( )

The most important ones for our discussion are the initialize and nextKeyvalue functions which we will override. The initialize function will be called only once for each split so we will do setup in this function and the nextKeyValue function is called for providing records, here we will write logic so that we send 3 records in the value instead of default 1. Here is the source listing for the class :

public class NLinesRecordReader extends RecordReader<LongWritable, Text>{
    private final int NLINESTOPROCESS = 3;
    private LineReader in;
    private LongWritable key;
    private Text value = new Text();
    private long start =0;
    private long end =0;
    private long pos =0;
    private int maxLineLength;

@Override
    public void close() throws IOException {
        if (in != null) {
            in.close();
        }
    }

@Override
    public LongWritable getCurrentKey() throws IOException,InterruptedException {
        return key;
    }

@Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

@Override
    public float getProgress() throws IOException, InterruptedException {
        if (start == end) {
            return 0.0f;
        }
        else {
            return Math.min(1.0f, (pos - start) / (float)(end - start));
        }
    }

@Override
    public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
        FileSplit split = (FileSplit) genericSplit;
        final Path file = split.getPath();
        Configuration conf = context.getConfiguration();
        this.maxLineLength = conf.getInt(&quot;mapred.linerecordreader.maxlength&quot;,Integer.MAX_VALUE);
        FileSystem fs = file.getFileSystem(conf);
        start = split.getStart();
        end= start + split.getLength();
        boolean skipFirstLine = false;
        FSDataInputStream filein = fs.open(split.getPath());

        if (start != 0){
            skipFirstLine = true;
            --start;
            filein.seek(start);
        }
        in = new LineReader(filein,conf);
        if(skipFirstLine){
            start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

@Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
            value = new Text();
        }
        value.clear();
        final Text endline = new Text(&quot;\n&quot;);
        int newSize = 0;
        for(int i=0;i<NLINESTOPROCESS;i++){
            Text v = new Text();
            while (pos < end) {
                newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                value.append(v.getBytes(),0, v.getLength());
                value.append(endline.getBytes(),0, endline.getLength());
                if (newSize == 0) {
                    break;
                }
                pos += newSize;
                if (newSize < maxLineLength) {
                    break;
                }
            }
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
}

Step 3 : Change in driver to use new Inputformat

Now that we have the custom record reader ready lets modify our driver to use the new input format by adding following line of code

    job.setInputFormatClass(NLinesInputFormat.class);

Step 4 : Change the mapper to emit number of lines it gets each time
Here is the listing; its pretty self explanatory. I am only putting listing of map function here for the listing here.

public void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException
{
    String lines = value.toString();
    String []lineArr = lines.split(&quot;\n&quot;);
    int lcount = lineArr.length;
    context.write(new Text(new Integer(lcount).toString()),new IntWritable(1));
 }

Sample Data Input :

I have used sample data input files of 10000 lines of following format

Shantanu , Deo

Suruchi, Bhide

Shamika, Deo

Mujtaba, Ahmed

Sample Output from Reducer:

1        1

3        3333

This is because our mapper got 3333 records of 3 lines each and 1 last record of 1 line.

I hope you understood the article. If you liked it please feel free to share this . Also comment.

Thanks and have great day!

Advertisements

24 thoughts on “Hadoop Tutorial : Custom Record Reader with TextInputFormat

  1. Very helpful tutorial.I have a couple of questions. Line 80-82 is to ignore empty lines, correct?
    Line 89-92 is what I don’t understand.

    Like

      1. hello,I run the program above , I find that it can not output correct result like your output? can you explain it ? thanks

        Like

  2. Interesting article, actually similar to what I’ve published today ^^
    In case a line spans over 2 distinct Splits, line will be read in Split(i) and will be skipped in Split(i+1). In your implementation, X lines will be read from Split(i), but only 1 line will be skipped from Split(i+1), meaning that you’d probably end up with some duplicates…
    Small example with 8 Lines spanned over 2 Splits

    ——
    Line1
    Line2
    Line3
    Line4
    ——
    Line5
    Line6
    Line7
    Line8
    ——

    Will result on

    Line 1,Line2,Line3
    Line 4,Line5,Line6
    Line 6,Line7,Line8

    We get a duplicate Line 6 here.. Do you share same point of view ?
    Should you need further details, refer to my article with some example of block boundaries.
    http://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit/

    There should be a way to tell Split(i+1) that first X lines have been read and therefore must be ignored…

    Like

    1. Sorry I was not right with the issue I previously described.
      Thanks to line #76 (pos < end), Hadoop makes sure a record that starts after end of current Split is not processed. Taking same example as before, we get

      Lines 1,2,3 from Reader#1
      Line 4 from Reader#1
      Lines 5,6,7 from Reader#2
      …/…

      OK we don't get duplicates. However some records won't be part of a 3-lines tuple (e.g. Line 4 here)

      Like

  3. Hello Sir
    I have a question regarding Hadoop, Could you please help me ? I appreciate your help very much
    I am trying to implement bottom up divide and conquer algorithm using Hadoop. In this implementation, I have a list of points as the input for the map functions. I would like to run several jobs in a way that first job divides the input points into groups at each group 2 points and then run reduce function on each group.

    The second job takes the first job output as its input. The second job will divide the points into groups at each group 4 points and run the reduce function on them. and so on.

    My question is, how I can control the grouping operation so the first job divides the points into 2 points groups, the second job divides the points into 4 points groups, the third job divides the points into 8 points group, and the last job divide the points in one group that contains all the points.

    Thanks

    Hussein

    Like

  4. I think you get syntax wrong
    job.setInputFormatClass(NLinesInputFormat.class); -> job.setInputFormat(NLinesInputFormat.class);

    Like

  5. Hi,

    Thanks for your article..its really helpful. But I am new to hadoop and I am unable to understand the code in initialize() and nextkeyvalue() method. Could you please explain what exactly are we doing in these two methods. I only got that we are initilizializing file split and seperating records as per logic. Could you please explain about the classes that you have used over there?

    Thank you,
    Vamshi

    Like

  6. Hi, I’m new to hadoop and trying to implement the custom input through recordreader. I have the input file something like this
    6273
    5328
    6238
    2789

    I want to read each line by LineRecordReader and do some process and the new inputs to mapper will be more than one like this
    6273 -> 2312,6787,5864,9023

    Just wondering if this is possible ? thanks in advance for your help.

    Like

  7. Thank you for this tutorial.
    I am new in hadoop and I have one question.
    Lines 50-54 is what I don’t understand.

    If the first part of split begins with first byte of line (not split), do we skip it or not?
    As I know from apache documents split.getStart() –return the position of the first byte in the file to process, it is not the first byte of line.

    Like

  8. Great article. Is there a way to read two lines cumulatively. For e.g.
    Line 1
    Line 2
    Line 3
    Line 4

    I want them to be read as:
    Line 1, Line 2
    Line 2, Line 3
    Line 3, Line 4

    and so on.

    Please let me know if you know a solution for the above.

    Thanks

    Like

  9. I want to have something like this:

    Input file contains:
    Line 1
    Line 2
    Line 3
    Line 4

    I want it to be read by the mapper like:
    Line 1, Line 2
    Line 2, Line 3
    Line 3, Line 4

    Is there any solution for this?

    Thanks a lot!

    Like

  10. Is it possible to customise the inputformat to read data from HDFS shell command e.g. hdfs dfs -cat file.txt. At the moment we have to pass the input folder/files but could we ‘cat’ data?

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s