pythonapache-sparkamazon-s3pysparkamazon-emr

Pyspark error in EMR writting parquet files to S3


I have a process that reads data from S3, processes it and then saves it again to s3 in other location in parquet format. Sometimes I get this error when it is writting:

    y4j.protocol.Py4JJavaError: An error occurred while calling o426.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=17, partition=264) failed; but task commit success, data duplication may happen. reason=ExceptionFailure(org.apache.spark.SparkException,[TASK_WRITE_FAILED] Task failed while writing rows to s3://bucket/path.,[Ljava.lang.StackTraceElement;@397e21a9,org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to s3://bucket/path.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421)
    at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: One or more of the specified parts could not be found.  The part may not have been uploaded, or the specified entity tag may not match the part's entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; Request ID: 0RGE13WMZ76BMPW6; S3 Extended Request ID: up90NKdAy7UIp3Rep2+J293TUhfFcno8iG/Y7Qr8uZOLMMzrQAwrZrfKojzKsq5iKiuGPQLz9/g=; Proxy: null), S3 Extended Request ID: up90NKdAy7UIp3Rep2+J293TUhfFcno8iG/Y7Qr8uZOLMMzrQAwrZrfKojzKsq5iKiuGPQLz9/g=

I get this error in some executions. EMR service role have permissions to write to S3.


Solution

  • I know this is not the most helpful response, but IMO this is a bug in the EMR S3 connector that AWS/EMR should own fixing. In your place, I would open a ticket to AWS Support with a minimum reproducible example.

    What is happening is that the EMR S3 connector uses S3 Multipart upload, described here: https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html

    When you use that S3 API, after you upload each part, S3 responds you with "part uploaded, here is the ETag for this part". It's your job (or rather in this case, the job of the EMR S3 Connector provided by AWS) to keep track of all these ETags, and then, once all the parts have been uploaded, to call "Completed multipart upload; here are all the parts + ETags I think I uploaded" to S3, so that S3 can verify that all expected parts have been uploaded and the resulting file will not silently be a partial file.

    The problem/error here is because the AWS EMR S3 connector for some reason fails at keeping correct accounting of all the parts that it uploaded, perhaps because it's parallelized across the executors or for some other reason. Hence while I am with you that I'd much rather try to work around/fix this myself, I suspect you will have a hard time fixing it because it's internal to the S3 connector and in the very least you'd need more logging information to understand what & why is happening. Hence I would cut a ticket to AWS Support about this bug - if not fixing it, at least they should provide EMR-user-facing exception, which this is not - this is the raw S3 Multipart upload exception that the EMR S3 connector receives from the S3 API.