joinpysparkapache-spark-sql

Calculating a new column in spark df based on another spark df without an explicit join column


I have the df1 and df2 without a common crossover column. Now I need to add a new column in df1 from df2 if a condition based on columns df2 is met. I will try to explain myself better with an example:

df1:

+--------+----------+
|label   |    raw   |
+--------+----------+
|0.0     |-1.1088619|
|0.0     |-1.3188809|
|0.0     |-1.3051535|
+--------+----------+

df2:

+--------------------+----------+----------+
|    probs           |    minRaw|    maxRaw|
+--------------------+----------+----------+
|                 0.1|-1.3195256|-1.6195256|
|                 0.2|-1.6195257|-1.7195256|
|                 0.3|-1.7195257|-1.8195256|
|                 0.4|-1.8195257|-1.9188809|

The expected output will be a new column in df1 that get the df2.probs if df1.raw value is between df2.minRaw and df2.maxRaw .

My first aproach has been try to explode the range minRaw and maxRaw, and then joined dataframes, but those columns are continuous. The second idea is an udflike this:

def get_probabilities(raw):
    df= isotonic_prob_table.filter((F.col("min_raw")>=raw)& \
                                    (F.col("max_raw")<=raw))\
                           .select("probs")
    df.show()
    #return df.select("probabilidad_bin").value()
    #return df.first()["probabilidad_bin"]

But it takes a long time in my large dataframe, and give me this alerts:

23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 82:>                 (0 + 1) / 1][Stage 83:====>            (4 + 3) / 15]23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

If value is'n't between minRaw and maxRaw, the output expected is null and df1 can have duplicates.

I have spark version 2.4.7 and I'm not a pyspark expert. Thank you in advance for read!


Solution

  • I think you can just join those dataframes with the condition between.

    df1.join(df2, f.col('raw').between(f.col('maxRaw'), f.col('minRaw')), 'left').show(truncate=False)
    
    +-----+-----+-----+----------+----------+
    |label|raw  |probs|minRaw    |maxRaw    |
    +-----+-----+-----+----------+----------+
    |0.0  |-1.1 |null |null      |null      |
    |0.0  |-1.1 |null |null      |null      |
    |0.0  |-1.32|0.1  |-1.3195256|-1.6195256|
    |0.0  |-1.32|0.1  |-1.3195256|-1.6195256|
    |0.0  |-1.73|0.3  |-1.7195257|-1.8195256|
    |0.0  |-1.88|0.4  |-1.8195257|-1.9188809|
    +-----+-----+-----+----------+----------+