I have the df1 and df2 without a common crossover column. Now I need to add a new column in df1 from df2 if a condition based on columns df2 is met. I will try to explain myself better with an example:
df1:
+--------+----------+
|label | raw |
+--------+----------+
|0.0 |-1.1088619|
|0.0 |-1.3188809|
|0.0 |-1.3051535|
+--------+----------+
df2:
+--------------------+----------+----------+
| probs | minRaw| maxRaw|
+--------------------+----------+----------+
| 0.1|-1.3195256|-1.6195256|
| 0.2|-1.6195257|-1.7195256|
| 0.3|-1.7195257|-1.8195256|
| 0.4|-1.8195257|-1.9188809|
The expected output will be a new column in df1 that get the df2.probs if df1.raw value is between df2.minRaw and df2.maxRaw .
My first aproach has been try to explode the range minRaw and maxRaw, and then joined dataframes, but those columns are continuous. The second idea is an udflike this:
def get_probabilities(raw):
df= isotonic_prob_table.filter((F.col("min_raw")>=raw)& \
(F.col("max_raw")<=raw))\
.select("probs")
df.show()
#return df.select("probabilidad_bin").value()
#return df.first()["probabilidad_bin"]
But it takes a long time in my large dataframe, and give me this alerts:
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 82:> (0 + 1) / 1][Stage 83:====> (4 + 3) / 15]23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
If value is'n't between minRaw and maxRaw, the output expected is null and df1 can have duplicates.
I have spark version 2.4.7 and I'm not a pyspark expert. Thank you in advance for read!
I think you can just join those dataframes with the condition between.
df1.join(df2, f.col('raw').between(f.col('maxRaw'), f.col('minRaw')), 'left').show(truncate=False)
+-----+-----+-----+----------+----------+
|label|raw |probs|minRaw |maxRaw |
+-----+-----+-----+----------+----------+
|0.0 |-1.1 |null |null |null |
|0.0 |-1.1 |null |null |null |
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.73|0.3 |-1.7195257|-1.8195256|
|0.0 |-1.88|0.4 |-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+