I have an existing table, which I'd like to append two columns to. I create a Spark dataframe:
spark_df = spark.createDataFrame(df)
Then I'd like to use MERGE INTO as so:
spark.sql(f"""MERGE INTO x.y AS m
USING {spark_df} AS s
ON m.id = s.id
WHEN MATCHED THEN
UPDATE SET m.id = s.id
WHEN NOT MATCHED
THEN INSERT (id, colnew_1) VALUES (id, spark_df[["myCol"]])""")
I retrieve a syntax error when trying to parse the spark_df. Is this functionality possible? I understand that a Delta table is to be created first, so that the MERGE operation is supported. However I'm a bit confused on the sequence of events. For example, I can create a delta table like so:
CREATE TABLE x.y_delta (id bigint, colnew_1 bigint) USING delta
However this table is empty. I suppose an intermediate step is to completely copy the original table, to this new delta table. Then use this delta table accordingly. Though I'm not convinced that this is also right.
As suggested by @blackbishop, Create temp view for the data frame.
df12.createOrReplaceTempView("temp_table1")
I followed the same suggestion, its working fine .Follow below steps:
Code:
Sample data frame df12:
from pyspark.sql import types as f
df12 = spark.createDataFrame(
[
(1,"vam",400),
(2,"gov",456)
],
f.StructType(
[
f.StructField("id", f.IntegerType(), True),
f.StructField("col1", f.StringType(), True),
f.StructField("myCol", f.IntegerType(), True)
]
),
)
Create Delta table :
spark.sql("CREATE TABLE x.y_delta2 (id int, col1 string, myCol int) USING delta")
spark.sql("insert into x.y_delta2 values (1,'govind',123),(3,'deep',456)")
Create Temp View
df12.createOrReplaceTempView("temp_table1")
Merge operation:
spark.sql(f"""MERGE INTO x.y_delta2 AS m
USING temp_table1 AS s
ON m.id = s.id
WHEN MATCHED THEN
UPDATE SET m.id = s.id
WHEN NOT MATCHED
THEN INSERT (m.id,m.col1,m.myCol) VALUES (s.id,s.col1,s.myCol)""")