javahadoopmapreducewritable

Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector


I have created a hadoop custom writable as shown below

public class ResultType implements Writable {

    private Text xxxx;
    private Text yyyy;
    private Text zzzz;

    public ResultType() {}

    public ResultType(Text xxxx, Text yyyy, Text zzzz) {
        this.xxxx = xxxx;
        this.yyyy = yyyy;
        this.zzzz = zzzz;   
    }

    public Text getxxxx() {
        return this.xxxx;
    }

    public Text getyyyy() {
        return this.yyyy;
    }

    public Text getzzzz() {
        return this.zzzz;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        xxxx.readFields(in);
        yyyy.readFields(in);
        zzzz.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        xxxx.write(out);
        yyyy.write(out);
        zzzz.write(out);
    }
}

My mapper code is

public static class Mapper1 extends TableMapper<Text, ResultType> {

    private Text text = new Text();

    @Override
    public void map(ImmutableBytesWritable row, Result values, Context context)
            throws IOException, InterruptedException {

        // getting name value

        String xxxx = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("xxxx")));
        String yyyy = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("yyyy")));
        String zzzz = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("zzzz")));
        text.set(xxxx);
        context.write(text, new ResultType(new Text(xxxx), new Text(yyyy), new Text(zzzz)));

    }
}

My Reducer code is

public static class Reducer1 extends Reducer<Text, ResultType, Text, ResultType> {

    public void reduce(Text key, Iterable<ResultType> values, Context context)
            throws IOException, InterruptedException {

        List<ResultType> returnset = new ArrayList<ResultType>();
        Map<String, ResultType> duplicatelist = new HashMap<String, ResultType>();
        boolean iskeyadded = true;

        for (ResultType val : values) {

            Text yyyy = val.getyyyy();
            Text zzzz = val.getzzzz();

            String groupkey = yyyy + "," + zzzz ;

            if (duplicatelist.containsKey(groupkey)) {

                if (iskeyadded) {
                    context.write(key, new ResultType(new Text(key), new Text(yyyy),
                            new Text(zzzz)));
                    iskeyadded = false;
                }

                context.write(key, new ResultType(new Text(key), new Text(yyyy), new Text(zzzz)));

            } else {
                duplicatelist.put(groupkey, val);
            }
        }

    }
}

when I run this code I am getting

Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@20890b6f
java.lang.NullPointerException
    at test.ResultType.readFields(ResultType.java)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)
    at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1688)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1637)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1489)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723)
    at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2019)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:797)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Solution

  • You're getting a NullPointerException because none of the Text objects in your custom writable are created anywhere. You could just create them where you declare them at the top of the class.

    private Text xxxx = new Text();
    private Text yyyy = new Text();
    private Text zzzz = new Text();
    

    I would also suggest that your constructor that sets them changes to:

    public ResultType(Text xxxx, Text yyyy, Text zzzz) {
        this.xxxx.set(xxxx);
        this.yyyy.set(yyyy);
        this.zzzz.set(zzzz);   
    } 
    

    Unlike strings, Text objects are not immutable, so making them equal doesnt create a new Text object. This will cause problems if you're trying to reuse the Text objects elsewhere.