I get this error in PySpark when I try to sql merge a delta table:
py4j.protocol.Py4JJavaError: An error occurred while calling o64.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:744)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:875)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:504)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:165)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:165)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:158)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:158)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:178)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:308)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:606)
at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:308)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:323)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:277)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:256)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Here is the sql merge query:
merge into re_obu_logistic_status_cockpit_report
using (
select *
from re_data_to_merge
) info_to_merge
on (
re_obu_logistic_status_cockpit_report.date = info_to_merge.date
and re_obu_logistic_status_cockpit_report.status_id = info_to_merge.status_id
and re_obu_logistic_status_cockpit_report.materialnumber = info_to_merge.materialnumber
and re_obu_logistic_status_cockpit_report.sales_partner_id = info_to_merge.sales_partner_id
)
when matched
then update set re_obu_logistic_status_cockpit_report.status = info_to_merge.status
,re_obu_logistic_status_cockpit_report.sales_partner_name = info_to_merge.sales_partner_name
,re_obu_logistic_status_cockpit_report.obu_count = info_to_merge.obu_count
,re_obu_logistic_status_cockpit_report.obu_count_pct = info_to_merge.obu_count_pct
,re_obu_logistic_status_cockpit_report.dan_last_changed_tsz = info_to_merge.dan_last_changed_tsz
,re_obu_logistic_status_cockpit_report.dan_last_changed_epoch = info_to_merge.dan_last_changed_epoch
when not matched
then insert (
re_obu_logistic_status_cockpit_report.date
,re_obu_logistic_status_cockpit_report.status_id
,re_obu_logistic_status_cockpit_report.status
,re_obu_logistic_status_cockpit_report.materialnumber
,re_obu_logistic_status_cockpit_report.sales_partner_id
,re_obu_logistic_status_cockpit_report.sales_partner_name
,re_obu_logistic_status_cockpit_report.obu_count
,re_obu_logistic_status_cockpit_report.obu_count_pct
,re_obu_logistic_status_cockpit_report.dan_last_changed_tsz
,re_obu_logistic_status_cockpit_report.dan_last_changed_epoch
)
values (
info_to_merge.date
,info_to_merge.status_id
,info_to_merge.status
,info_to_merge.materialnumber
,info_to_merge.sales_partner_id
,info_to_merge.sales_partner_name
,info_to_merge.obu_count
,info_to_merge.obu_count_pct
,info_to_merge.dan_last_changed_tsz
,info_to_merge.dan_last_changed_epoch
)
I restarted a new EMR cluster, removed the delta tables from S3, then I relaunched the python application. I also restarted the EC2 instance asking the EMR cluster to run the application.
This was due to wrong " character in the spark-submit command: The wrong spark-submit was:
spark-submit \
--master yarn \
--driver-memory 8g \
--executor-memory 8g \
--executor-cores 8 \
--packages io.delta:delta-core_2.12:2.0.0 \
--conf “spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension” \
--conf “spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog” \
--conf “spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED” \
--conf “spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED” \
--conf “spark.sql.legacy.timeParserPolicy=LEGACY” \
--conf spark.rpc.askTimeout=600s \
main.py
The correct spark-submit is:
spark-submit \
--master yarn \
--driver-memory 8g \
--executor-memory 8g \
--executor-cores 8 \
--packages io.delta:delta-core_2.12:2.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED" \
--conf "spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED" \
--conf "spark.sql.legacy.timeParserPolicy=LEGACY" \
--conf spark.rpc.askTimeout=600s \
main.py