apache-sparkjoinpysparkquery-optimizationanti-join

Removing keys from a small dataframe which are present in a larger dataframe in pyspark/spark


I have 2 dataframes,

  1. df_A is having size in few megabytes, primary key P
  2. df_B is having size in few gigabytes, also has this row P

How can i efficiently remove all records from df_A whose P are present in df_B?

Currently I use naive spark sql syntax to get this done without any optimisations

df_A.createOrReplaceTempView('df_A_table')
df_B.createOrReplaceTempView('df_B_table')

spark.sql("""
select
*
from df_A_table
where df_A_table.P not in (
    select P from df_B_table
)
""")

Does broadcast join make sense here, if so how does it help here (i am just guessing it might work, but not sure)


Solution

  • Removing all records from df_A whose P are present in df_B can be accomplished by performing a left anti-join between df_A and df_B.

    Here is an example below where tables A and B can be left anti joined on P:

    df_A:
    +---+---+
    |  P|  Q|
    +---+---+
    |  1| 10|
    |  2| 20|
    |  3| 30|
    |  4| 40|
    +---+---+
    
    df_B:
    +---+---+
    |  P|  S|
    +---+---+
    |  1|100|
    |  3|300|
    |  5|500|
    +---+---+
    

    Typically for a join (or antijoin) between tables A and B, pyspark will need to shuffle the data of both tables, and put the portions of both tables with matching values for P into the same nodes. However, shuffling a large table is expensive, so a broadcast join splits up the larger table into different nodes (without shuffling it), and the entire smaller table is "broadcast" or copied to each node to be joined with each piece of the larger table.

    In your case, since df_B is much larger than df_A, we broadcast the smaller dataframe to send it to all nodes to be joined with the larger dataframe (which is split across all nodes):

    df_A.join(
      F.broadcast(df_B),
      df_A.P == df_B.P,
      'leftanti'
    ).select(
        df_A.P, df_A.Q
    )
    

    Result:

    +---+---+
    |  P|  Q|
    +---+---+
    |  2| 20|
    |  4| 40|
    +---+---+