arraysscaladataframeapache-sparkapache-spark-2.3

How to convert array of array (string type) to struct - Spark/Scala?


I have a dataframe as

+---------------------------------------------------------------+---+
|family_name                                                    |id |
+---------------------------------------------------------------+---+
|[[John, Doe, Married, 999-999-9999],[Jane, Doe, Married,Wife,]]|id1|
|[[Tom, Riddle, Single, 888-888-8888]]                          |id2|
+---------------------------------------------------------------+---+
root
 |-- family_name: string (nullable = true)
 |-- id: string (nullable = true)

I wish to convert the column fam_name to array of named structs as

`family_name` array<struct<f_name:string,l_name:string,status:string,ph_no:string>>

Im able to convert family_name to array as shown below

val sch = ArrayType(ArrayType(StringType))

val fam_array = data
        .withColumn("family_name_clean", regexp_replace($"family_name", "\\[\\[", "["))
        .withColumn("family_name_clean_clean1", regexp_replace($"family_name_clean", "\\]\\]", "]"))
        .withColumn("ar", toArray($"family_name_clean_clean1"))
        //.withColumn("ar1", from_json($"ar", sch))
    fam_array.show(false)
    fam_array.printSchema()

+---------------------------------------------------------------+---+--------------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------------------+
|family_name                                                    |id |family_name_clean                                             |family_name_clean_clean1                                     |ar                                                                     |
+---------------------------------------------------------------+---+--------------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------------------+
|[[John, Doe, Married, 999-999-9999],[Jane, Doe, Married,Wife,]]|id1|[John, Doe, Married, 999-999-9999],[Jane, Doe, Married,Wife,]]|[John, Doe, Married, 999-999-9999],[Jane, Doe, Married,Wife,]|[[John,  Doe,  Married,  999-999-9999], [Jane,  Doe,  Married, Wife, ]]|
|[[Tom, Riddle, Single, 888-888-8888]]                          |id2|[Tom, Riddle, Single, 888-888-8888]]                          |[Tom, Riddle, Single, 888-888-8888]                          |[[Tom,  Riddle,  Single,  888-888-8888]]                               |
+---------------------------------------------------------------+---+--------------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------------------+
root
 |-- family_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- family_name_clean: string (nullable = true)
 |-- family_name_clean_clean1: string (nullable = true)
 |-- ar: array (nullable = true)
 |    |-- element: string (containsNull = true)

 

sch is a schema variable of desired type.

How do I convert column ar to array<struct<>> ?

EDIT:

I'm using Spark 2.3.2


Solution

  • To create an array of structs given an array of arrays of strings, you can use struct function to build a struct given a list of columns combined with element_at function to extract column element at a specific index of an array.

    To solve your specific problem, as you correctly stated you need to do two things:

    In Spark 3.0 and greater

    Using Spark 3.0, we can perform all those steps using spark built-in functions.

    For the first step, I would do as follows:

    And for the second step, use struct function to build a struct, picking element in arrays using element_at function.

    Thus, complete code using Spark 3.0 and greater would be as follows, with data as input dataframe:

    import org.apache.spark.sql.functions.{col, element_at, regexp_replace, split, struct, transform}
    
    val result = data
      .withColumn(
        "family_name", 
        transform( 
          split( // first level split
            regexp_replace(col("family_name"), "\\[\\[|]]", ""), // remove [[ and ]]
            "],\\["
          ), 
          x => split(x, ",") // split for each element in first level array
        )
      )
      .withColumn("family_name", transform(col("family_name"), x => struct(
        element_at(x, 1).as("f_name"), // index starts at 1
        element_at(x, 2).as("l_name"),
        element_at(x, 3).as("status"),
        element_at(x, -1).as("ph_no"), // get last element of array
      )))
    

    In Spark 2.X

    Using Spark 2.X, we have to rely on an user-defined function. First, we need to define a case class that represent our struct:

    case class FamilyName(
      f_name: String, 
      l_name: String, 
      status: String, 
      ph_no: String
    )
    

    Then, we define our user-defined function and apply it to our input dataframe:

    import org.apache.spark.sql.functions.{col, udf}
    
    val extract_array = udf((familyName: String) => familyName
      .replaceAll("\\[\\[|]]", "")
      .split("],\\[")
      .map(familyName => {
        val explodedFamilyName = familyName.split(",", -1)
        FamilyName(
          f_name = explodedFamilyName(0),
          l_name = explodedFamilyName(1),
          status = explodedFamilyName(2),
          ph_no = explodedFamilyName(explodedFamilyName.length - 1)
        )
      })
    )
    
    val result = data.withColumn("family_name", extract_array(col("family_name")))
    

    Result

    If you have the following data dataframe:

    +---------------------------------------------------------------+---+
    |family_name                                                    |id |
    +---------------------------------------------------------------+---+
    |[[John, Doe, Married, 999-999-9999],[Jane, Doe, Married,Wife,]]|id1|
    |[[Tom, Riddle, Single, 888-888-8888]]                          |id2|
    +---------------------------------------------------------------+---+
    

    You get the following result dataframe:

    +-----------------------------------------------------------------+---+
    |family_name                                                      |id |
    +-----------------------------------------------------------------+---+
    |[{John,  Doe,  Married,  999-999-9999}, {Jane,  Doe,  Married, }]|id1|
    |[{Tom,  Riddle,  Single,  888-888-8888}]                         |id2|
    +-----------------------------------------------------------------+---+
    

    having the following schema:

    root
     |-- family_name: array (nullable = true)
     |    |-- element: struct (containsNull = false)
     |    |    |-- f_name: string (nullable = true)
     |    |    |-- l_name: string (nullable = true)
     |    |    |-- status: string (nullable = true)
     |    |    |-- ph_no: string (nullable = true)
     |-- id: string (nullable = true)