apache-sparkpysparkapache-spark-sqlunionanti-join

Anti join followed by union in Spark SQL


I am running PySpark script in which I am doing anti join & union of 2 dataframes. But I want to do it in Spark SQL.

df_src:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    100|     QC|
|    105|     XY|
|    110|     NM|
|    115|     AB|
+-------+-------+

df_lkp:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    100|     QC|
|    105|     XY|
|    106|     XZ|
+-------+-------+

We have two dataframes: df_src & df_lkp. I am extracting unmatched records from df_src:

df_unmatched = df_src.join(df_lkp, on=column_nm, how='left_anti')

It is giving this result:

df_unmatched

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    110|     NM|
|    115|     AB|
+-------+-------+

But I want to do this part using Spark SQL. I have created temporary view vw_df_src & vw_df_lkp and trying to run the following query, but not getting the result.

unmatched_sql = "SELECT * from vw_df_src where {0} in (select {0} from vw_df_src minus select {0} from vw_df_lkp)".format('call_id')
df_unmatched = sqlContext.sql(unmatched_sql)

I am also doing union of both the dataframes and dropping duplicates. I am using below code:

df_src1 = df_lkp.union(df_src)
df_src1.show(10)
df_src2 = df_src1.dropDuplicates(['call_id'])

df_src2:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    110|     NM|
|    100|     QC|
|    115|     AB|
|    106|     XZ|
|    105|     XY|
+-------+-------+

I want this to be done in Spark SQL too.

I am using the following code to create temp views:

df_src = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(src_file_nm)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(lkp_file)
df_lkp.createOrReplaceTempView('vw_df_lkp')

Solution

  • ANTI JOIN

    spark.sql(
    """select * from vw_df_src LEFT ANTI JOIN 
    
    vw_df_lkp  ON
    
    vw_df_src.call_nm= vw_df_lkp.call_nm """).show()
    
    
    +-------+-------+
    |call_id|call_nm|
    +-------+-------+
    |    115|     AB|
    |    110|     NM|
    +-------+-------+
    

    If running in a notebook cell not initialed as sql TRY

    %sql 
    select * from vw_df_src LEFT ANTI JOIN 
    
    vw_df_lkp  ON
    
    vw_df_src.call_nm= vw_df_lkp.call_nm 
    

    UNION

    In pyspark, union returns duplicates and you have to drop_duplicates() or use distinct(). In sql, union eliminates duplicates. The following will therefore do. Spark 2.0.0 unionall() retuned duplicates and union is the thing

    spark.sql(
    """select * from vw_df_src
    
    union
    
    select * from vw_df_lkp""" ).show()