I have a simple mapreduce code with mapper, reducer and combiner. The output from mapper is passed to combiner. But to the reducer, instead of output from combiner,output from mapper is passed.
Kindly help
Code:
package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class AverageSalary
{
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] empDetails= value.toString().split(",");
Text unit_key = new Text(empDetails[1]);
DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
context.write(unit_key,salary_value);
}
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text>
{
public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
{
String val;
double sum=0;
int len=0;
while (values.iterator().hasNext())
{
sum+=values.iterator().next().get();
len++;
}
val=String.valueOf(sum)+":"+String.valueOf(len);
try {
context.write(key,new Text(val));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text,Text, Text,Text>
{
public void reduce (final Text key, final Text values, final Context context)
{
//String[] sumDetails=values.toString().split(":");
//double average;
//average=Double.parseDouble(sumDetails[0]);
try {
context.write(key,values);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String args[])
{
Configuration conf = new Configuration();
try
{
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Main <in> <out>");
System.exit(-1); }
Job job = new Job(conf, "Average salary");
//job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(AverageSalary.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : -1);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
It seems that you forgot about important property of a combiner:
the input types for the key/value and the output types of the key/value need to be the same.
You can't take in a Text/DoubleWritable
and return a Text/Text
. I suggest you to use Text
Instead DoubleWritable
, and do proper parsing inside Combiner
.