scaladataframeapache-sparkapache-spark-sqluber-api

Check the minimum by iterating one row in a dataframe over all the rows in another dataframe


Let's say I have the following two dataframes:

DF1:
+----------+----------+----------+
|     Place|Population|    IndexA|     
+----------+----------+----------+
|         A|       Int|       X_A|
|         B|       Int|       X_B|
|         C|       Int|       X_C|
+----------+----------+----------+

DF2:
+----------+----------+
|      City|    IndexB|     
+----------+----------+
|         D|       X_D|      
|         E|       X_E|   
|         F|       X_F|  
|      ....|      ....|
|        ZZ|      X_ZZ|
+----------+----------+

The dataframes above are normally of much larger size.

I want to determine to which City(DF2) the shortest distance is from every Place from DF1. The distance can be calculated based on the index. So for every row in DF1, I have to iterate over every row in DF2 and look for the shortest distances based on the calculations with the indexes. For the distance calculation there is a function defined:

val distance = udf(
      (indexA: Long, indexB: Long) => {
        h3.instance.h3Distance(indexA, indexB)
      })

I tried the following:

val output =  DF1.agg(functions.min(distance(col("IndexA"), DF2.col("IndexB"))))

But this, the code compiles but I get the following error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s)
H3Index#220L missing from Places#316,Population#330,IndexAx#338L in operator !Aggregate
[min(if ((isnull(IndexA#338L) OR isnull(IndexB#220L))) null else UDF(knownnotnull(IndexA#338L), knownnotnull(IndexB#220L))) AS min(UDF(IndexA, IndexB))#346].

So I suppose I do something wrong with iterating over each row in DF2 when taking one row from DF1 but I couldn't find a solution.

What am I doing wrong? And am I in the right direction?


Solution

  • You are getting this error because the index column you are using only exists in DF2 and not DF1 where you are attempting to perform the aggregation.

    In order to make this field accessible and determine the distance from all points, you would need to

    1. Cross join DF1 and Df2 to have every index of Df1 matching every index of DF2
    2. Determine the distance using your udf
    3. Find the min on this new cross joined udf with the distances

    This may look like :

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.{col, min, udf}
    
    val distance = udf(
          (indexA: Long, indexB: Long) => {
            h3.instance.h3Distance(indexA, indexB)
          })
    
    val resultDF = DF1.crossJoin(DF2)
        .withColumn("distance", distance(col("IndexA"), col("IndexB")))
        //instead of using a groupby then matching the min distance of the aggregation with the initial df. I've chosen to use a window function min to determine the min_distance of each group (determined by Place) and filter by the city with the min distance to each place
        .withColumn("min_distance", min("distance").over(Window.partitionBy("Place")))
        .where(col("distance") === col("min_distance"))
        .drop("min_distance")
    

    This will result in a dataframe with columns from both dataframes and and additional column distance.

    NB. Your current approach which is comparing every item in one df to every item in another df is an expensive operation. If you have the opportunity to filter early (eg joining on heuristic columns, i.e. other columns which may indicate a place may be closer to a city), this is recommended.

    Let me know if this works for you.