Process Small Files on Hadoop Using CombineFileInputFormat (1)
Processing small files is an old typical problem in hadoop; On Stack Overflow it suggested people to use CombineFileInputFormat, but I haven’t found a good step-to-step article that teach you how to use it. So, I decided to write one myself.
A small file is one which is significantly smaller than the HDFS block size (default 64MB).
If you’re storing small files, then you probably have lots of them (otherwise you wouldn’t turn to Hadoop),
and the problem is that HDFS can’t handle lots of files.
In my benchmark, just using a custom CombineFileInputFormat can speedup the program from 3 hours to 23 minutes, and after some further tuning, the same task can be run in 6 minutes!
Benchmark Setup
To test the raw performance of different approaches to solve small problems, I setup a map only hadoop job that basically just do grep and perform a small binary search. The binary search part is to generate the reduce side keys that I’ll use in further data processing; it took only a little resource (8MB index) to run, so it does not affect the result of the benchmark.
The data to process is some server log data, 53.1 GB in total. The hadoop clusters consist 6 nodes, using hadoop version 1.1.2. In this benchmark I implemented CombineFileInputFormat to shrink the map jobs; I also tested the difference of reusing JVM or not, and different number of block sizes to combine files.
CombineFileInputFormat
The code listed here is modified from Hadoop example code. To use CombineFileInputFormat you need to implement three classes. The class CombineFileInputFormat is an abstract class with no implementation, so you must create a subclass to support it; we’ll name the subclass CFInputFormat. The subclass will initiate a delegate CFRecordReader that extends RecordReader; this is the code that does the file processing logic. We’ll also need a class for FileLineWritable, which replaces LongWritable normally used as a key to file lines.
CFInputFormat.java
The CFInputFormat.java doesn’t do much. You implement createRecordReader to pass in the record reader that does the combine file logic, that’s all. Note that you can call setMaxSplitSize in the initializer to control the size of each chunk of files; if you don’t want to split files into half, remember to return false in isSplitable method, which defaults to true.
packageorg.idryman.combinefiles;importjava.io.IOException;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.InputSplit;importorg.apache.hadoop.mapreduce.JobContext;importorg.apache.hadoop.mapreduce.RecordReader;importorg.apache.hadoop.mapreduce.TaskAttemptContext;importorg.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;importorg.apache.hadoop.mapreduce.lib.input.CombineFileSplit;importorg.idryman.combinefiles.CFRecordReader;importorg.idryman.combinefiles.FileLineWritable;publicclassCFInputFormatextendsCombineFileInputFormat<FileLineWritable,Text>{publicCFInputFormat(){super();setMaxSplitSize(67108864);// 64 MB, default block size on hadoop}publicRecordReader<FileLineWritable,Text>createRecordReader(InputSplitsplit,TaskAttemptContextcontext)throwsIOException{returnnewCombineFileRecordReader<FileLineWritable,Text>((CombineFileSplit)split,context,CFRecordReader.class);}@OverrideprotectedbooleanisSplitable(JobContextcontext,Pathfile){returnfalse;}}
CFRecordReader.java
CFRecordReader is a delegate class of CombineFileRecordReader, a built in class that pass each split (typically a whole file in this case) to our class CFRecordReader. When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process, and decides how many splits base on the MaxSplitSize we defined in CFInputFormat. For every split (must be a file, because we set isSplitabe to false), CombineFileRecordReader creates a CFRecrodReader instance via a custom constructor, and pass in CombineFileSplit, context, and index for CFRecordReader to locate the file to process with.
When processing the file, the CFRecordReader creates a FileLineWritable as the key for hadoop mapper class. With each line a FileLineWritable consists the file name and the offset length of that line. The difference between FileLineWritable and the normally used LongWritable in mapper is LongWritable only denote the offset of a line in a file, while FileLineWritable adds the file information into the key.