So here is the next article in series. In the last post we learnt how to write wordcount without using explicit custom mappers or reducers. You can find the post here
Today we will go a step ahead and we will rewrite the same wordcount program by writing our own custom mappers as well as reducers.
We will use 2 classes in addition to our wordcount class.
Class WCMap as a mapper class and Class WCReduce as a reducer class.
The mapper class WCMap will be created by extending the Class Mapper<LongWritable, Text, Text, IntWritable> . Here …
First generic param to class is the offset which we can forget right now .
Second param Text represents each line that we are going to get at a time to our map function.
Third param is again text which says that we are going to output a key from mapper which is of type text.
Fourth param IntWritable is the type of value that we will output for our key in the mapper.
We will also be overriding the function map in this class from Mapper class. In this function we will parse the input line and output the splitted words with count 1 to hadoop using its context.
Here is the listing of mapper …
public class WCMap extends Mapper<LongWritable, Text, Text, IntWritable>
{
public void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException
{
String line = value.toString();
String [] tokens = line.split(" "); // This is the delimiter between words
for (String tok :tokens)
{
context.write(new Text(tok), new IntWritable(1));
}
}
}
Now that we have mapper ready. Lets look at the reducer. The reducer is a class which will be extended from the class Reducer. We will override the reduce function the reducer class also takes the type params
Param 1 : InputKey Type from Mapper
Param 2 : Input Value Type List from mapper
Param 3 : Output Key type for this reducer
Param 4 : Output Value type for this reducer.
In the reducer we will sum up the counts for all values in the list that we get for the same key and output the final count.
Here is the listing of the reducer …
public class WCReduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable list ,Context context) throws java.io.IOException ,InterruptedException
{
int count = 0;
for (IntWritable item : list)
{
count += item.get();
}
context.write(key, new IntWritable(count));
}
}
Now that we have both Mappers as well as reducer lets tell our job which classes will be acting as mappers or reducers.
Here is the listing of out main setup class…
public class wordcount extends Configured implements Tool{
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration configuration = new Configuration();
ToolRunner.run(configuration, new wordcount(),args);
}
@Override
public int run(String[] arg0) throws Exception {
Job job = new Job();
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setJarByClass(wordcount.class);
job.setMapperClass(WCMap.class);
job.setReducerClass(WCReduce.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job,new Path(arg0[1]));
job.submit();
int rc = (job.waitForCompletion(true)?1:0);
return rc;
}
}
Tada … and we have our wordcount program with custom mapper and a custom reducer ready and working …
Please dont forget to give your feedback comments.
Thanks
Ciao.
Thank you so much for the example. I have copied the code into eclipse and packaged it into a jar. But when I try to run it using
hduser@nikhil-VirtualBox:/usr/local/hadoop/hadoop-1.0.4$ bin/hadoop jar mywordcount.jar /user/hduser/input /user/hduser/output3
I get the following
12/10/30 14:59:38 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/10/30 14:59:38 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
12/10/30 14:59:38 INFO input.FileInputFormat: Total input paths to process : 14
12/10/30 14:59:38 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/10/30 14:59:38 WARN snappy.LoadSnappy: Snappy native library not loaded
12/10/30 14:59:40 INFO mapred.JobClient: Running job: job_201210301426_0003
12/10/30 14:59:41 INFO mapred.JobClient: map 0% reduce 0%
12/10/30 14:59:58 INFO mapred.JobClient: Task Id : attempt_201210301426_0003_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException: com.test.WCMap
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:867)
at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:719)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassNotFoundException: com.test.WCMap
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:820)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:865)
… 8 more
12/10/30 14:59:59 INFO mapred.JobClient: Task Id : attempt_201210301426_0003_m_000001_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException: com.test.WCMap
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:867)
at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:719)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassNotFoundException: com.test.WCMap
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:820)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:865)
… 8 more
12/10/30 15:00:07 INFO mapred.JobClient: Task Id : attempt_201210301426_0003_m_000000_1, Status : FAILED
LikeLike
missing class name
LikeLike
Well I seem to be getting the same error as above. Can you please shed some light on it?
LikeLike
please add static key word infront of your mapper if you have included the map code in the same file as the main class
LikeLike