scalaapache-sparkpysparkuser-defined-functionsscala-spark

Spark merge two columns that are arrays of different structs with overlapping field


I have a question I was unable to solve when working with Scala Spark (or PySpark). How can we merge two fields that are arrays of structs of different fields.

For example, if I have schema like so:

df.printSchema()
root
 |-- arrayOne: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- Q: string (nullable = true)
 |-- ArrayTwo: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: string (nullable = true)
 |    |    |-- y: string (nullable = true)
 |    |    |-- z: string (nullable = true)
 |    |    |-- Q: string (nullable = true)

Can I create a df of the following schema using UDF:

df.printSchema()
root
 |-- arrayOne: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- Q: string (nullable = true)
 |-- ArrayTwo: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: string (nullable = true)
 |    |    |-- y: string (nullable = true)
 |    |    |-- z: string (nullable = true)
 |    |    |-- Q: string (nullable = true)
 |-- ArrayThree: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- Q: string (nullable = true)
 |    |    |-- x: string (nullable = true)
 |    |    |-- y: string (nullable = true)
 |    |    |-- z: string (nullable = true)

When a,b,c are not null, x,y,z are null and vice-versa, however when x,y,z are nulls there is Q that will be non-null and have the same value for both arrays.

The UDF is an important aspect here, as exploding (explode_outer) both fields will be:

  1. Too expensive
  2. Resulting in repetition of the second array elements that would corrupt the fidelity of the data - because of the element Q.

Writing UDF in Pig Latin or even plain Map Reduce would be very easy, but for some reason it is very complicated in the Spark environment, for me at least.

What would be a way to write a UDF to concatenate the two arrays and create the new struct with superset of elements of the two different structs?


Solution

  • I will share below the solution that worked for me. Solution is a simple UDF that takes two arrays of structs as input, and creates a sequence of new struct that supersets the fields of the two structs as required:

    case class ItemOne(a: String,
                       b: String,
                       c: String,
                       Q: String)
    
    case class ItemTwo(x: String,
                       y: String,
                       z: String,
                       Q: String)
    
    case class ItemThree(a: String,
                       b: String,
                       c: String,
                       x: String,
                       y: String,
                       z: String,
                       Q: String)
    
    
    val combineAuctionData = udf((arrayOne: Seq[Row], arrayTwo: Seq[Row]) => {
      val result = new ListBuffer[ItemThree]()
    
      // For loop over list of ItemOne and get all ItemThree
      for (el <- arrayOne) {
        result += ItemThree(el.getString(0),
                            el.getString(1),
                            el.getString(2),
                            None,
                            None,
                            None,
                            el.getString(3))
      }
    
      // For loop over list of ItemTwo and get all ItemThree
      for (el <- arrayTwo) {
        result += ItemThree(None,
                            None,
                            None,
                            el.getString(0),
                            el.getString(1),
                            el.getString(2),
                            el.getString(3))
      }
    
      // Return List inheriting Seq of ItemThree's
      result.toSeq
    }: Seq[ItemThree])