April 17, 2013

One Reduce, Two Outputs

Recently, I implemented k-means as an iterative Hadoop operation. I needed to output two things during the reduce phase, the new centroids and a cost metric to evaluate convergence. Being relatively new to Hadoop, I wasn’t sure how to do this. All the introductory examples I had seen only produced a single reduce output.

A review of the Hadoop documentation revealed MultipleOutputs. It does exactly what I wanted, allows mappers and reducers to output more than one thing. The following snippet includes the relevant code from my implementation,

    public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
    
        private MultipleOutputs<NullWritable, Text> multipleOutputs; 
    
        @Override
        protected void setup(Context context) 
                    throws IOException, InterruptedException {
            multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
        }
    
        @Override 
        protected void cleanup(Context context) 
                    throws IOException, InterruptedException {
            multipleOutputs.close();
        }
    
        public void reduce(Text key, Iterable<Text> values, Context context) 
          throws IOException, InterruptedException {
    
            //
            // Reduce logic
            //
    
            multipleOutputs.write(NullWritable.get(),
                                          new Text(centroidString),
                                          "centroid/part");
    
            multipleOutputs.write(NullWritable.get(),
                          new Text(String.format("%f",totalCost)),
                          "cost/part");
        }
    }

This first few lines define a private MultipleOutputs instance and the necessary setup and tear-down steps. The real action is in the reduce() routine. Two calls to multipleOutputs.write() are used to create the two output streams. The first argument provides the output key which, in this case, is NULL. The second argument provides the value. The third argument defines the path and file name prefix for the output. In this example, the centroid data will be written to the centroid sub-directory in the specified output directory. The individual reducer output files will be of the form part-r-0000. The cost data is similarly stored under the sub-directory cost.

I was really happy to discover MultipleOutputs. It enables performing multiple computations within a single map/reduce pass over the data. With the right problem and implementation, this can really increase the efficiency of Hadoop.

Tags:  Hadoop