I am working on the hadoop map-reduce framework and following the Hadoop- The Definitive guide book.
As specified in book, i have implemented a Map-reduce task, which reads the input file as a whole and deligates the output to a SequenceFileOutputFormat. Here are the classes which i have implemented:
SmallFilesToSequenceFileConverter.java
public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
private Text filenameKey;
@Override
protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
InputSplit split = context.getInputSplit();
Path path = ((FileSplit)split).getPath();
filenameKey = new Text(path.getName());
}
@Override
protected void map(NullWritable key, BytesWritable value,
Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
context.write(filenameKey, value);
}
}
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(SequenceFileMapper.class);
job.setNumReduceTasks(2);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception{
String argg[] = {"/Users/bng/Documents/hadoop/inputFromBook/smallFiles",
"/Users/bng/Documents/hadoop/output_SmallFilesToSequenceFileConverter"};
int exitcode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), argg);
System.exit(exitcode);
}
}
WholeFileInputFormat.java
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
WholeFileRecordReader.java
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!processed){
byte[] contents = new byte[(int)fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}catch(Exception e){
e.printStackTrace();
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
}
}
As specified here in SmallFilesToSequenceFileConverter.java, when i use a single reduce task, everything works fine and i got the output as expected as follows:
//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������xd[^•MÈÔg…h#Ÿa������a���
aaaaaaaaaa������b���
bbbbbbbbbb������c���
cccccccccc������d���
dddddddddd������dummy���ffffffffff
������e����������f���
ffffffffff
But the problem here is when i use two reduce tasks, i got the output results being processed by both the reduce tasks. In case of two reduce tasks, here is the output.
//part-r-00000
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������ÓÙE˜xØÏXØâÆU.êÚ������a���
aaaaaaaaaa������b�
bbbbbbbbbb������c
cccccccccc������e����
//part-r-00001
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable������π¸ú∞8Á8˜lÍx∞:¿������b���
bbbbbbbbbb������d���
dddddddddd������dummy���ffffffffff
������f���
ffffffffff
Which shows that the data "bbbbbbbbbb" is being processed by both reduce tasks. What could be the problem here? Or is it the fine to have this result? Or any mistake i am making?
For reference, the input directory contains, six input files name a to f, each containing data corresponsding to the file name, e.g. file named a contain data "aaaaaaaaaaa" and other files contain the similar data except the e file which is empty. And there is a file named dummy, which contains data "ffffffffff".
I don't get the exact reason for this.
But deleting the name node and data node directory as specified in hdfs-site.xml and restarting the hdfs, yarn and mr services solved the issue for me.