I have a map-only hadoop job, running on Amazon's EMR, running on the latest ami-version: 3.0.4. Once in a while I get exceptions like this:
Error: com.amazonaws.AmazonClientException: Unable to verify integrity of data download. Client calculated content length didn't match content length received from Amazon S3. The
data may be corrupt.
at com.amazonaws.util.ContentLengthValidationInputStream.validate(ContentLengthValidationInputStream.java:144)
at com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:81)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.read(EmrFileSystem.java:289)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.DataInputStream.read(DataInputStream.java:149)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.readAByte(CBZip2InputStream.java:195)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:866)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:423)
at org.apache.hadoop.io.compress.BZip2Codec.read(BZip2Codec.java:483)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:164)
at org.apache.hadoop.mapred.MapTask.nextKeyValue(MapTask.java:544)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Is there any way to cure this? Why does this happen? Is it network problem in amazon? It can't be a problem with the input file, as re-running the same job usually succeeds. Is there a way to catch this exception? Why doesn't hadoop automatically cure it?
My main class looks like this:
public class LogParserMapReduce extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(LogParserMapReduce.class);
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.setBoolean("keep.failed.task.files", true);
/*
* Instantiate a Job object for your job's configuration.
*/
Job job = Job.getInstance(conf);
/*
* The expected command-line arguments are the paths containing
* input and output data. Terminate the job if the number of
* command-line arguments is not exactly 2.
*/
if (args.length != 2) {
System.out.printf("Usage: LogParserMapReduce <input dir> <output dir>\n");
System.exit(-1);
}
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(LogParserMapReduce.class);
/*
* Specify an easily-decipherable name for the job.
* This job name will appear in reports and logs.
*/
job.setJobName("LogParser");
/*
* Specify the paths to the input and output data based on the
* command-line arguments.
*/
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
/*
* Specify the mapper and reducer classes.
*/
job.setMapperClass(LogParserMapper.class);
/*
* For the SysLogEvent count application, the input file and output
* files are in text format - the default format.
*
* In text format files, each record is a line delineated by a
* by a line terminator.
*
* When you use other input formats, you must call the
* SetInputFormatClass method. When you use other
* output formats, you must call the setOutputFormatClass method.
*/
/*
* For the logs count application, the mapper's output keys and
* values have the same data types as the reducer's output keys
* and values: Text and IntWritable.
*
* When they are not the same data types, you must call the
* setMapOutputKeyClass and setMapOutputValueClass
* methods.
*/
/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
LOG.info("LogParserMapReduce: waitingForCompletion");
/*
* Start the MapReduce job and wait for it to finish.
* If it finishes successfully, return 0. If not, return 1.
*/
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
}
The solution was very simple (after Amazon's customer support told me): I had to upgrade to the latest AMI (currently it's 3.1.0) that has the latest Hadoop (2.4) and also make sure that I used the same hadoop version for the compilation of the Java code. Ever since I haven't see this kind of problem.