scalaapache-spark

Get field from Apache Spark Row which is Wrapped array as a Seq into a List using Scala


Background

DATA FORMAT

  val factories = """
      {
        "cities": {
          "name": "Sao Paulo"
          "areas": [
            {
              "code": "41939",
              "type": "downtown"
            },
            {
              "code": "48294",
              "type": "residential"
            }
          ],
        },
       
        "domains": [
            {
               "id": "19sk2nfb",
               "name" : "defense"
            }
        ]
    }

CODE

This fetches the data from the delta table and creates case class objects

fetchedData is DataFrame fetched using some criteria

factoriesSchema is json schema

val structuredData =
    fetchedData.withColumn(
      "StructuredFactoryJson",
      from_json(col("FactoryData"), factoriesSchema)
  )

val factories = structuredData.collect().map { row =>
      val structJson = row.getAs[Row]("StructuredFactoryJson")
      val citiesRow = structJson.getAs[Row]("cities")
      val city = City(
        citiesRow.getAs[String]("name"),
        citiesRow
          .getAs[Seq[Row]]("areas")
          .map(areaRow =>
            Area(
              area.getAs[String]("type"),
              area.getAs[String]("code")
            )
          )
      )
      val domains = structJson
        .getAs[Seq[Row]]("domains")
        .map( area ->
           Area( area.getAs
             .
             .
             .

    }


Problem

This works fine and Seq is obtained. But the issues is that, if there is any way to get List instead of Seq and construct the bigger object as is


Solution

  • After digging around a bit I found 2 ways to accomplish this

    1. Using JavaConverters

    This approach was first discovered in an attempt to get a List from the instead of Seq. But it was observed that the List returned is a Java List. Hence it was needed to convert it to Scala List

    import scala.collection.JavaConverters._
    
    val factories = structuredData.collect().map { row =>
          val structJson = row.getAs[Row]("StructuredFactoryJson")
          val citiesRow = structJson.getAs[Row]("cities")
          val city = City(
            citiesRow.getAs[String]("name"),
            citiesRow
              .getList(citiesRow.fieldIndex("areas"))
              .asScala
              .map((areaRow : Row) =>
                Area(
                  areaRow.getAs[String]("type"),
                  areaRow.getAs[String]("code")
                )
              ).toList
          )
          val domains = structJson
            .getList(structJson.fieldIndex("domains"))
            .asScala
            .map((area : Row) ->
               Area( area.getAs
                 .
                 .
                 .
           ).toList
        }
    

    Issues

    1. Without JavaConverters

    Again doing some more search I found the more descent approach and settled with this one

    val factories = structuredData.collect().map { row =>
          val structJson = row.getAs[Row]("StructuredFactoryJson")
          val citiesRow = structJson.getAs[Row]("cities")
          val city = City(
            citiesRow.getAs[String]("name"),
            citiesRow
              .getSeq[Row](citiesRow.fieldIndex("areas"))
              .map((areaRow : Row) =>
                Area(
                  areaRow.getAs[String]("type"),
                  areaRow.getAs[String]("code")
                )
              ).toList
          )
          val domains = structJson
            .getSeq[Row](structJson.fieldIndex("domains"))
            .asScala
            .map((area : Row) ->
               Area( area.getAs
                 .
                 .
                 .
           ).toList
        }
    

    This resolved the issue