javamapreducehadoop2reducerssequencefile

Why is Hadoop Map-Reduce application processing the same data in two different reduce tasks?


I am working on the hadoop map-reduce framework and following the Hadoop- The Definitive guide book.

As specified in book, i have implemented a Map-reduce task, which reads the input file as a whole and deligates the output to a SequenceFileOutputFormat. Here are the classes which i have implemented:

SmallFilesToSequenceFileConverter.java

public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
    static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
        private Text filenameKey;

        @Override
        protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub

            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit)split).getPath();
            filenameKey = new Text(path.getName());

        }

        @Override
        protected void map(NullWritable key, BytesWritable value,
                Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            context.write(filenameKey, value);
        }
    }

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());

        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
        SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setMapperClass(SequenceFileMapper.class);
        job.setNumReduceTasks(2);

        return job.waitForCompletion(true) ? 0 : 1;

    }

    public static void main(String[] args) throws Exception{

        String argg[] = {"/Users/bng/Documents/hadoop/inputFromBook/smallFiles",
        "/Users/bng/Documents/hadoop/output_SmallFilesToSequenceFileConverter"}; 

        int exitcode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), argg);
        System.exit(exitcode);
    }
}

WholeFileInputFormat.java

public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{



@Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    @Override
      public RecordReader<NullWritable, BytesWritable> createRecordReader(
          InputSplit split, TaskAttemptContext context) throws IOException,
          InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
      }
}

WholeFileRecordReader.java

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if(!processed){
            byte[] contents = new byte[(int)fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try{
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
    }

}

As specified here in SmallFilesToSequenceFileConverter.java, when i use a single reduce task, everything works fine and i got the output as expected as follows:

//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������xd[^•MÈÔg…h#Ÿa������a���
aaaaaaaaaa������b���
bbbbbbbbbb������c���
cccccccccc������d���
dddddddddd������dummy���ffffffffff
������e����������f���
ffffffffff

But the problem here is when i use two reduce tasks, i got the output results being processed by both the reduce tasks. In case of two reduce tasks, here is the output.

//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������ÓÙE˜xØÏXØâÆU.êÚ������a���
aaaaaaaaaa������b�
bbbbbbbbbb������c
cccccccccc������e����

//part-r-00001
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������π¸ú∞8Á8˜lÍx∞:¿������b���
bbbbbbbbbb������d���
dddddddddd������dummy���ffffffffff
������f���
ffffffffff

Which shows that the data "bbbbbbbbbb" is being processed by both reduce tasks. What could be the problem here? Or is it the fine to have this result? Or any mistake i am making?

For reference, the input directory contains, six input files name a to f, each containing data corresponsding to the file name, e.g. file named a contain data "aaaaaaaaaaa" and other files contain the similar data except the e file which is empty. And there is a file named dummy, which contains data "ffffffffff".


Solution

  • I don't get the exact reason for this.

    But deleting the name node and data node directory as specified in hdfs-site.xml and restarting the hdfs, yarn and mr services solved the issue for me.