javahadoopmapreducewritable

Hadoop MapReduce chain with ArrayWritable


I'm trying to create a mapreduce chain composed of two steps. The first reduce emit key-value pairs as (key, value) where value is a list of custom object and the second mapper should read the output of the first reducer. The list is a custom ArrayWritable. Here is the relevant code:

Custom object:

public class Custom implements Writable {
    private Text document;
    private IntWritable count;

    public Custom(){
        setDocument("");
        setCount(0);
    }

    public Custom(String document, int count) {
        setDocument(document);
        setCount(count);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        document.readFields(in);
        count.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        document.write(out);
        count.write(out);
    }

    @Override
    public String toString() {
        return this.document.toString() + "\t" + this.count.toString();
    }

    public int getCount() {
        return count.get();
    }

    public void setCount(int count) {
        this.count = new IntWritable(count);
    }

    public String getDocument() {
        return document.toString();
    }

    public void setDocument(String document) {
        this.document = new Text(document);
    }
}

Custom ArrayWritable:

 class MyArrayWritable extends ArrayWritable {
    public MyArrayWritable(Writable[] values) {
        super(Custom.class, values);
    }

    public MyArrayWritable() {
        super(Custom.class);
    }

    @Override
    public Custom[] get() {
        return (Custom[]) super.get();
    }

    @Override
    public String toString() {
      return Arrays.toString(get());
    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        super.write(arg0);
    }
}

First reducer:

public static class NGramReducer extends Reducer<Text, Text, Text, MyArrayWritable> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        //other code
        context.write(key, mArrayWritable);
    }
}

Second mapper:

public static class SecondMapper extends Mapper<Text, MyArrayWritable, Text, IntWritable> {
    private StringBuilder docBuilder= new StringBuilder();

    public void map(Text key, MyArrayWritable value, Context context) throws IOException, InterruptedException {
        //whatever code
    }
}

And these are the setting in main:

    //...
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(MyArrayWritable.class);
    job1.setInputFormatClass(WholeFileInputFormat.class);
    FileInputFormat.addInputPath(job1, new Path(args[2]));
    FileOutputFormat.setOutputPath(job1, TEMP_PATH);
    //...
    job2.setInputFormatClass(KeyValueTextInputFormat.class);
    FileInputFormat.addInputPath(job2, TEMP_PATH);
    FileOutputFormat.setOutputPath(job2, new Path(args[3]));

When I run it i get this error Error: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to Detector$MyArrayWritable

What is the problem? Do I have to write a FileInputFormat? (job1 works fine)


Solution

  • It looks as though this is because of your job 2 InputFormat. KeyValueTextInputFormat.class expects a key and value which are both Text objects. As your job 1 outputs (Text,MyArrayWritable), there is a conflict with the value.

    Luckily you don't have to write a custom OutputFormat to cater for your data! Simply write the output of your job 1 data into sequence files, which keeps the data in its binary form:

    //...
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(MyArrayWritable.class);
    job1.setInputFormatClass(WholeFileInputFormat.class);
    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    FileInputFormat.addInputPath(job1, new Path(args[2]));
    SequenceFileOutputFormat.setOutputPath(job1, TEMP_PATH);
    //...
    job2.setInputFormatClass(SequenceFileInputFormat.class);
    SequenceFileInputFormat.addInputPath(job2, TEMP_PATH);
    FileOutputFormat.setOutputPath(job2, new Path(args[3]));