dataframescalaapache-sparkapache-spark-sqlscala-spark

Add a tag to the list in the DataFrame based on the data from the second DataFrame


I have two DataFrames - the first one with the columns model, cnd, age, tags (this is a repeatable field - String list/array), min, max and the second one with the main_model column.
I would like to add the MAIN tag to the first DataFrame to the tags list if the model field value from this DataFrame coincides with any model from DataFrame with main models. The code is written in Scala. Example below:

INPUT:
DataFrame no1
+------+-----+----+-------+------+-----+
| model| cnd | age| tags  |  min | max |
+------+-----+----+-------+------+-----+
|  foo1|   xx|  10|  []   |   1  |  2  |
|  foo2|   yy|  20|  []   |   2  |  3  | 
|  foo3|   zz|  30|  []   |   3  |  4  | 
+------+-----+----+-------+------+-----+

DataFrame no 2 - list for verifying models from the first DataFrame

+-----------+
| main_model|
+-----------+
|  foo1     |
|  foo3     | 
|  foo5     | 
+-----------+


 OUTPUT:

+------+-----+----+-------+------+-----+
| model| cnd | age| tags  |  min | max |
+------+-----+----+-------+------+-----+
|  foo1|   xx|  10|[MAIN] |   1  |  2  |
|  foo2|   yy|  20|  []   |   2  |  3  | 
|  foo3|   zz|  30|[MAIN] |   3  |  4  | 
+------+-----+----+-------+------+-----+

I haven't been able to find a reasonable solution. So far Im trying with:

    dataFrame1.join(dataFrame2, dataFrame1("model") === dataFrame2("main_model"), "left_outer") .select(
dataFrame1("model"),
dataFrame1("cnd"),
dataFrame1("age"),
when(dataFrame2("main_model").isNotNull, concat(dataFrame1("tags"), lit(", MAIN"))).otherwise(dataFrame1("tags")).alias("tags"),
dataFrame1("min"),
dataFrame1("max")
)

Solution

  • You were close, you can use array_union instead of concat

    import org.apache.spark.sql.functions._
    
    val data = Seq(("foo1","xx",10,Array[String](),1,2),
                   ("foo2","yy",20,Array[String](),2,3),
                   ("foo3","zzxx",30,Array[String](),3,4))
    val df = data.toDF("model","cnd","age","tags","min","max")
    val modelData = Seq(("foo1"),("foo3"),("foo5"))
    val modelDf = modelData.toDF("main_model")
    
    val joinedDf = df.join(modelDf, df("model") === modelDf("main_model"), "left")
    joinedDf.select(
      df("model"),
      df("cnd"),
      df("age"),
      when(modelDf("main_model").isNotNull, array_union(df("tags"), lit(Array("MAIN")))).otherwise(df("tags")).alias("tags"),
      df("min"),
      df("max")
    ).show
    

    And the output:

    +-----+----+---+------+---+---+
    |model| cnd|age|  tags|min|max|
    +-----+----+---+------+---+---+
    | foo1|  xx| 10|[MAIN]|  1|  2|
    | foo2|  yy| 20|    []|  2|  3|
    | foo3|zzxx| 30|[MAIN]|  3|  4|
    +-----+----+---+------+---+---+