amazon-web-serviceshadoopamazon-emrbzip2

Hadoop failure copying input bz2 file from s3


I have a map-only hadoop job, running on Amazon's EMR, running on the latest ami-version: 3.0.4. Once in a while I get exceptions like this:

Error: com.amazonaws.AmazonClientException: Unable to verify integrity of data download.  Client calculated content length didn't match content length received from Amazon S3.  The
data may be corrupt.
    at com.amazonaws.util.ContentLengthValidationInputStream.validate(ContentLengthValidationInputStream.java:144)
    at com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:81)
    at java.io.FilterInputStream.read(FilterInputStream.java:133)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.read(EmrFileSystem.java:289)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.readAByte(CBZip2InputStream.java:195)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:866)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:423)
    at org.apache.hadoop.io.compress.BZip2Codec.read(BZip2Codec.java:483)
    at java.io.InputStream.read(InputStream.java:101)

    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:164)
    at org.apache.hadoop.mapred.MapTask.nextKeyValue(MapTask.java:544)
    at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper.nextKeyValue(WrappedMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
    at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

Is there any way to cure this? Why does this happen? Is it network problem in amazon? It can't be a problem with the input file, as re-running the same job usually succeeds. Is there a way to catch this exception? Why doesn't hadoop automatically cure it?

My main class looks like this:

public class LogParserMapReduce extends Configured implements Tool {
    private static final Log LOG = LogFactory.getLog(LogParserMapReduce.class);

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = super.getConf();

    conf.setBoolean("mapred.compress.map.output", true);
    conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    conf.setBoolean("keep.failed.task.files", true);

    /*
     * Instantiate a Job object for your job's configuration.  
     */
    Job job = Job.getInstance(conf);

    /*
     * The expected command-line arguments are the paths containing
     * input and output data. Terminate the job if the number of
     * command-line arguments is not exactly 2.
     */
    if (args.length != 2) {
      System.out.printf("Usage: LogParserMapReduce <input dir> <output dir>\n");
      System.exit(-1);
    }

    /*
     * Specify the jar file that contains your driver, mapper, and reducer.
     * Hadoop will transfer this jar file to nodes in your cluster running
     * mapper and reducer tasks.
     */
    job.setJarByClass(LogParserMapReduce.class);

    /*
     * Specify an easily-decipherable name for the job.
     * This job name will appear in reports and logs.
     */
    job.setJobName("LogParser");

    /*
     * Specify the paths to the input and output data based on the
     * command-line arguments.
     */
    FileInputFormat.addInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

    /*
     * Specify the mapper and reducer classes.
     */
    job.setMapperClass(LogParserMapper.class);

    /*
     * For the SysLogEvent count application, the input file and output 
     * files are in text format - the default format.
     * 
     * In text format files, each record is a line delineated by a 
     * by a line terminator.
     * 
     * When you use other input formats, you must call the 
     * SetInputFormatClass method. When you use other 
     * output formats, you must call the setOutputFormatClass method.
     */

    /*
     * For the logs count application, the mapper's output keys and
     * values have the same data types as the reducer's output keys 
     * and values: Text and IntWritable.
     * 
     * When they are not the same data types, you must call the 
     * setMapOutputKeyClass and setMapOutputValueClass 
     * methods.
     */

    /*
     * Specify the job's output key and value classes.
     */
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setNumReduceTasks(0);

    LOG.info("LogParserMapReduce: waitingForCompletion");
    /*
     * Start the MapReduce job and wait for it to finish.
     * If it finishes successfully, return 0. If not, return 1.
     */
    boolean success = job.waitForCompletion(true);
    return success ? 0 : 1;
  }

}

Solution

  • The solution was very simple (after Amazon's customer support told me): I had to upgrade to the latest AMI (currently it's 3.1.0) that has the latest Hadoop (2.4) and also make sure that I used the same hadoop version for the compilation of the Java code. Ever since I haven't see this kind of problem.