On the last post I presented how to use Mapper context object to obtain Path information. This is a nice way to hack for ad-hoc jobs; however, it’s not really reusable and abstract. In this post, I’ll show you how to subclass Text, TextInputFormat, and LineRecordReader and create reusable components across all of your hadoop tasks.
Input WorkFlow
Before we go through all the classes, let me show you how hadoop read the files in.
By default, hadoop uses TextInputFormat, which inherits FileInputFormat, to process the input files.
TextInputFormat allocates LineRecordReader and passed it to Task runtime.
Task first initiates LineRecordReader, then wrap the LineRecordReader into Context object.
In Mapperrun methods, it calls the method nextKeyValue() in Context, and reads the LongWritable key from context.getCurrentKey() and Text value from context.getCurrentValue(). Those methods delegates to LineRecordReader’s methods nextKeyValue(), getCurrentKey(), and getCurrentValue().
Finally, Mapper passes the key-value pair to map method that we usually overrides.
In order to put the path information into this workflow, we can extend the Text class and put the path information into it. To make this work, we need to create three new classes: TextWithPath, TWPInputFormat, and TWPRecordReader.
TextWithPath.java
Here is our content wrapper – TextWithPath. It doesn’t do much; there’s a new constructor which accepts Path, and there’s a getter method to get Path.
The new TWPInputFormat is almost identical to TextInputFormat, except it uses TextWithPath instead of Text, and the createRecordReader method returns TWPRecordReader instead of LineRecordReader.
Finally, in the TWPRecordReader, this is where I put my logic in. In the initialize method, you can get the FileSplit and get the Path object out of it. Next, let’s override nextKeyValue, and updates the value on every call. Lastly, remember to override getCurrentValue(), else it will only return parent’s value (Text), not the value with TextWithPath class.
Here is a demo code to test the output. In addition to normal map reduce tasks, we set the input format class to TWPInpuFormat; on the Mapper side, we expect the input is TextWithPath, not Text. The whole program can be downloaded from this github repo. Hadoop TextWithPath
packageorg.idryman;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;publicclassDemoRunextendsConfiguredimplementsTool{publicstaticvoidmain(String[]args)throwsException{System.exit(ToolRunner.run(newConfiguration(),newDemoRun(),args));}@Overridepublicintrun(String[]args)throwsException{Configurationconf=getConf();Jobjob=newJob(conf);job.setJobName("test TextWithPath Input");job.setJarByClass(DemoRun.class);TWPInputFormat.addInputPath(job,newPath(args[0]));job.setInputFormatClass(TWPInputFormat.class);job.setMapperClass(TestMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setReducerClass(IntSumReducer.class);job.setNumReduceTasks(1);FileOutputFormat.setOutputPath(job,newPath(args[1]));job.submit();job.waitForCompletion(true);return0;}publicstaticclassTestMapperextendsMapper<LongWritable,TextWithPath,Text,IntWritable>{/** * Only override `run` instead of `map` method; because we just want to see one output * per mapper, instead of printing every line. */@Overridepublicvoidrun(Contextcontext)throwsIOException,InterruptedException{context.nextKeyValue();TextWithPathtwp=context.getCurrentValue();context.write(newText(twp.getPath().toString()),newIntWritable(1));}}}
One more thing
I wrote another hadoop utility that reads a header file from HDFS input source, and passes a FieldWritable object to Mapper class instead of Text. The FieldWritable implements Map interface and can obtain TSV fields by it’s header key. The project is on github but still highly experimental. Once the API and implementation is stable, I’ll write another post to introduce it. Enjoy!