My mapreduce application look like this. I want to sum the 3 values from the strings
public class StockCount {
public static class MapperClass
extends Mapper<Object, Text, Text, IntArrayWritable> {
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line[] = value.toString().split(",");
//mgrno,rdate,cusip,shares,sole,shared,no
// [0], [1], [2], [3], [4], [5],[6]
if (line.length > 5){
Text mgrno = new Text(line[0]);
IntWritable[] intArray = new IntWritable[3];
intArray[0] = new IntWritable(Integer.parseInt(line[4]));
intArray[1] = new IntWritable(Integer.parseInt(line[5]));
intArray[2] = new IntWritable(Integer.parseInt(line[6]));
int[] pass = new int[3];
pass[0] = Integer.parseInt(line[4]);
pass[1] = Integer.parseInt(line[5]);
pass[0] = Integer.parseInt(line[6]);
IntArrayWritable array = new IntArrayWritable(intArray);
context.write(mgrno, array);
}
}
}
public static class IntSumReducer
extends Reducer<Text, int[], Text, IntArrayWritable> {
public void reduce(Text key, Iterable<IntArrayWritable> values,
Context context
) throws IOException, InterruptedException {
int sum1 = 0;
int sum2 = 0;
int sum3 = 0;
for (IntArrayWritable val : values) {
IntWritable[] temp = new IntWritable[3];
temp = val.get();
sum1 += temp[0].get();
sum2 += temp[1].get();
sum3 += temp[2].get();
}
IntWritable[] intArray = new IntWritable[3];
intArray[0] = new IntWritable(sum1);
intArray[1] = new IntWritable(sum2);
intArray[2] = new IntWritable(sum3);
IntArrayWritable result = new IntArrayWritable(intArray);
context.write(key, result);
}
}
As I want to sum 3 of my values, I defined a Class IntArrayWritable inherited from ArrayWritable. ArrayWritable contains Writable[]-s
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
public class IntArrayWritable extends ArrayWritable {
public IntArrayWritable(IntWritable[] values) {
super(IntWritable.class, values);
}
public IntArrayWritable() {
super(IntWritable.class);
}
@Override
public IntWritable[] get() {
return (IntWritable[]) super.get();
}
@Override
public String toString() {
IntWritable[] values = get();
return values[0].toString() + ", " + values[1].toString() + ", " +
values[2].toString();
}
}
I don't really see why it can't cast "return (IntWritable[]) super.get();"
17/11/21 04:00:26 WARN mapred.LocalJobRunner: job_local1623924180_0001
java.lang.Exception: java.lang.ClassCastException: [Lorg.apache.hadoop.io.Writable; cannot be cast to [Lorg.apache.hadoop.io.IntWritable;
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: [Lorg.apache.hadoop.io.Writable; cannot be cast to [Lorg.apache.hadoop.io.IntWritable;
at IntArrayWritable.get(IntArrayWritable.java:15)
at IntArrayWritable.toString(IntArrayWritable.java:22)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:85)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:104)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I would really appreciate the help.
Thans!
First of all, Reducer<Text, int[],
should have a Writable type instead of int[]
However, you can just use a comma separated Text Writable value from the mapper.
There's no clear benefit to writing your own Writable class only for passing an array.
You can parse and sum from the reducer