I have an Spark application working flawlessly using emr-5.29.0 and Spark 2.4.4. This app writes to S3 using Spark SQL like this
df
.repartition($"year", $"month", $"day")
.write
.partitionBy("year", "month", "day")
.mode(saveMode)
.parquet(path)
The path is a s3a
valid one.
My problem comes when I upgraded to Spark 3.5.0 and emr-7.0.0. My jar is aligned with the provided dependencies provided by AWS like this:
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "3.4.1",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0",
"org.apache.spark" %% "spark-core" % "3.5.0",
"org.apache.spark" %% "spark-sql" % "3.5.0",
"org.apache.spark" %% "spark-streaming" % "3.5.0",
"com.amazonaws" % "aws-java-sdk" % "1.12.569",
"org.apache.hadoop" % "hadoop-aws" % "3.3.6",
)
But when executing the corresponding step in the EMR 7.0.0 the job fails when trying to write in S3:
xception in thread "main" org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a://x-bucket/raw/xxx: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: xxxx, Extended Request ID: xxxxxxxx):null: null (Service: S3, Status Code: 400, Request ID: xxxxx, Extended Request ID: gxxxxxxx)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:237)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:153)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3987)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3893)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4889)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2654)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2673)
at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4887)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:125)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:520)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:559)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:520)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:113)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:503)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:503)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:479)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:101)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:151)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
at com.d.data.pipeline.utils.ParquetEventWriter.write(ParquetEventWriter.scala:13)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.getKafkaEventsByTopicBetweenTimestamps(KafkaToS3SparkApp.scala:182)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.$anonfun$processKafkaEventsInBetweenDates$1(KafkaToS3SparkApp.scala:130)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.$anonfun$processKafkaEventsInBetweenDates$1$adapted(KafkaToS3SparkApp.scala:120)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.processKafkaEventsInBetweenDates(KafkaToS3SparkApp.scala:120)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.main(KafkaToS3SparkApp.scala:75)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp.main(KafkaToS3SparkApp.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: JB2KN3QAMSCD504T, Extended Request ID: gcvyho7fncYObCK92QMAPIeanVQ8ShjPOoJNBE3dewSr18ZRBczNmIY00+sDe6lFjfjv+jShI7Q=)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:95)
at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:270)
at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:198)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
at software.amazon.awssdk.services.s3.DefaultS3Client.headObject(DefaultS3Client.java:5495)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$8(S3AFileSystem.java:2847)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:431)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2835)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2807)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3965)
I'm using the eu-west-1
region.
The EMR is using the EMR_DefaultRole_v2
role, also to try to remove authz from the equation I added the AmazonS3FullAccess
policy to the role but no dice, the error is still there.
I've also found this: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-access-grants.html but I don't know if that could be the answer given that I just set full access for S3.
Also if you know how to debug / check logs for the request id's that the s3 library throws it would be awesome :)
I've expected that the job will behave exactly like the emr-5.29.0 and Spark 2.4.4 one, but the error thrown is super opaque and difficult to debug.
Thanks In advance.
It worked by explicitly set the s3a endpoint like this:
--conf spark.hadoop.fs.s3a.endpoint=s3.eu-west-1.amazonaws.com
Hope it's helpful for other ppl :)