apache-sparkamazon-s3apache-spark-sqlamazon-emr

400 Bad Request error when trying to write to S3 from an EMR 7.0.0 cluster


I have an Spark application working flawlessly using emr-5.29.0 and Spark 2.4.4. This app writes to S3 using Spark SQL like this

df
    .repartition($"year", $"month", $"day")
    .write
    .partitionBy("year", "month", "day")
    .mode(saveMode)
    .parquet(path)

The path is a s3a valid one.

My problem comes when I upgraded to Spark 3.5.0 and emr-7.0.0. My jar is aligned with the provided dependencies provided by AWS like this:

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "3.4.1",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0",
  "org.apache.spark" %% "spark-core" % "3.5.0",
  "org.apache.spark" %% "spark-sql" % "3.5.0",
  "org.apache.spark" %% "spark-streaming" % "3.5.0",
  "com.amazonaws" % "aws-java-sdk" % "1.12.569",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.6",
)

But when executing the corresponding step in the EMR 7.0.0 the job fails when trying to write in S3:

xception in thread "main" org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a://x-bucket/raw/xxx: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: xxxx, Extended Request ID: xxxxxxxx):null: null (Service: S3, Status Code: 400, Request ID: xxxxx, Extended Request ID: gxxxxxxx)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:237)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:153)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3987)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3893)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4889)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2654)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2673)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4887)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:125)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:520)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:559)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:520)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:113)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:503)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:503)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:479)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:101)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:151)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
    at com.d.data.pipeline.utils.ParquetEventWriter.write(ParquetEventWriter.scala:13)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.getKafkaEventsByTopicBetweenTimestamps(KafkaToS3SparkApp.scala:182)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.$anonfun$processKafkaEventsInBetweenDates$1(KafkaToS3SparkApp.scala:130)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.$anonfun$processKafkaEventsInBetweenDates$1$adapted(KafkaToS3SparkApp.scala:120)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.processKafkaEventsInBetweenDates(KafkaToS3SparkApp.scala:120)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.main(KafkaToS3SparkApp.scala:75)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp.main(KafkaToS3SparkApp.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: JB2KN3QAMSCD504T, Extended Request ID: gcvyho7fncYObCK92QMAPIeanVQ8ShjPOoJNBE3dewSr18ZRBczNmIY00+sDe6lFjfjv+jShI7Q=)
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:95)
    at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:270)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
    at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:198)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
    at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
    at software.amazon.awssdk.services.s3.DefaultS3Client.headObject(DefaultS3Client.java:5495)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$8(S3AFileSystem.java:2847)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:431)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2835)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2807)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3965)

I'm using the eu-west-1 region.

The EMR is using the EMR_DefaultRole_v2 role, also to try to remove authz from the equation I added the AmazonS3FullAccess policy to the role but no dice, the error is still there.

I've also found this: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-access-grants.html but I don't know if that could be the answer given that I just set full access for S3.

Also if you know how to debug / check logs for the request id's that the s3 library throws it would be awesome :)

I've expected that the job will behave exactly like the emr-5.29.0 and Spark 2.4.4 one, but the error thrown is super opaque and difficult to debug.

Thanks In advance.


Solution

  • It worked by explicitly set the s3a endpoint like this:

    --conf spark.hadoop.fs.s3a.endpoint=s3.eu-west-1.amazonaws.com
    

    Hope it's helpful for other ppl :)