I am running PySpark script in which I am doing anti join & union of 2 dataframes. But I want to do it in Spark SQL.
df_src:
+-------+-------+
|call_id|call_nm|
+-------+-------+
| 100| QC|
| 105| XY|
| 110| NM|
| 115| AB|
+-------+-------+
df_lkp:
+-------+-------+
|call_id|call_nm|
+-------+-------+
| 100| QC|
| 105| XY|
| 106| XZ|
+-------+-------+
We have two dataframes: df_src & df_lkp. I am extracting unmatched records from df_src:
df_unmatched = df_src.join(df_lkp, on=column_nm, how='left_anti')
It is giving this result:
df_unmatched
+-------+-------+
|call_id|call_nm|
+-------+-------+
| 110| NM|
| 115| AB|
+-------+-------+
But I want to do this part using Spark SQL. I have created temporary view vw_df_src
& vw_df_lkp
and trying to run the following query, but not getting the result.
unmatched_sql = "SELECT * from vw_df_src where {0} in (select {0} from vw_df_src minus select {0} from vw_df_lkp)".format('call_id')
df_unmatched = sqlContext.sql(unmatched_sql)
I am also doing union of both the dataframes and dropping duplicates. I am using below code:
df_src1 = df_lkp.union(df_src)
df_src1.show(10)
df_src2 = df_src1.dropDuplicates(['call_id'])
df_src2:
+-------+-------+
|call_id|call_nm|
+-------+-------+
| 110| NM|
| 100| QC|
| 115| AB|
| 106| XZ|
| 105| XY|
+-------+-------+
I want this to be done in Spark SQL too.
I am using the following code to create temp views:
df_src = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(src_file_nm)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(lkp_file)
df_lkp.createOrReplaceTempView('vw_df_lkp')
ANTI JOIN
spark.sql(
"""select * from vw_df_src LEFT ANTI JOIN
vw_df_lkp ON
vw_df_src.call_nm= vw_df_lkp.call_nm """).show()
+-------+-------+
|call_id|call_nm|
+-------+-------+
| 115| AB|
| 110| NM|
+-------+-------+
If running in a notebook cell not initialed as sql TRY
%sql
select * from vw_df_src LEFT ANTI JOIN
vw_df_lkp ON
vw_df_src.call_nm= vw_df_lkp.call_nm
UNION
In pyspark
, union returns duplicates and you have to drop_duplicates
() or use distinct()
. In sql, union eliminates duplicates. The following will therefore do. Spark 2.0.0 unionall()
retuned duplicates and union
is the thing
spark.sql(
"""select * from vw_df_src
union
select * from vw_df_lkp""" ).show()