Recently, I implemented k-means as an iterative Hadoop operation. I needed to output two things during the reduce phase, the new centroids and a cost metric to evaluate convergence. Being relatively new to Hadoop, I wasn’t sure how to do this. All the introductory examples I had seen only produced a single reduce output.
A review of the Hadoop documentation revealed MultipleOutputs
. It does
exactly what I wanted, allows mappers and reducers to output more than
one thing. The following snippet includes the relevant code from my
implementation,
public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
multipleOutputs.close();
}
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//
// Reduce logic
//
multipleOutputs.write(NullWritable.get(),
new Text(centroidString),
"centroid/part");
multipleOutputs.write(NullWritable.get(),
new Text(String.format("%f",totalCost)),
"cost/part");
}
}
This first few lines define a private MultipleOutputs
instance and
the necessary setup and tear-down steps. The real action is in the
reduce()
routine. Two calls to multipleOutputs.write()
are used to
create the two output streams. The first argument provides the output
key which, in this case, is NULL
. The second argument provides the
value. The third argument defines the path and file name prefix for
the output. In this example, the centroid data will be written to the
centroid
sub-directory in the specified output directory. The
individual reducer output files will be of the form part-r-0000
. The
cost data is similarly stored under the sub-directory cost
.
I was really happy to discover MultipleOutputs
. It enables
performing multiple computations within a single map/reduce pass over
the data. With the right problem and implementation, this can really
increase the efficiency of Hadoop.
Tags: Hadoop