scalaapache-sparkapache-spark-sqlscala-spark

Convert spark scala dataset of one type to another


I have a dataset with following case class type:

  case class AddressRawData(
                         addressId: String,
                         customerId: String,
                         address: String
                       )

I want to convert it to:

case class AddressData(
                          addressId: String,
                          customerId: String,
                          address: String,
                          number: Option[Int], //i.e. it is optional
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )

Using a parser function:

  def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")
      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }

I am new to scala and spark. Could anyone please let me know how can this be done?


Solution

  • You were on the right path! There are multiple ways of doing this of course. But as you're already on the way by making some case classes, and you've started making a parsing function an elegant solution is by using the Dataset's map function. From the docs, this map function signature is the following:

    def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 
    

    Where T is the starting type (AddressRawData in your case) and U is the type you want to get to (AddressData in your case). So the input of this map function is a function that transforms a AddressRawData to a AddressData. That could perfectly be the addressParser you've started making!

    Now, your current addressParser has the following signature:

    def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData]
    

    In order to be able to feed it to that map function, we need to make this signature:

    def newAddressParser(unparsedAddress: AddressRawData): AddressData
    

    Knowing all of this, we can work further! An example would be the following:

    import spark.implicits._
    import scala.util.Try
    
    // Your case classes
    case class AddressRawData(addressId: String, customerId: String, address: String)
    case class AddressData(
      addressId: String,
      customerId: String,
      address: String,
      number: Option[Int],
      road: Option[String],
      city: Option[String],
      country: Option[String]
    )
    
    // Your addressParser function, adapted to be able to feed into the Dataset.map
    // function
    def addressParser(rawAddress: AddressRawData): AddressData = {
      val addressArray = rawAddress.address.split(", ")
      AddressData(
        rawAddress.addressId,
        rawAddress.customerId,
        rawAddress.address,
        Try(addressArray(0).toInt).toOption,
        Try(addressArray(1)).toOption,
        Try(addressArray(2)).toOption,
        Try(addressArray(3)).toOption
      )
    }
    
    // Creating a sample dataset
    val rawDS = Seq(
      AddressRawData("1", "1", "20, my super road, beautifulCity, someCountry"),
      AddressRawData("1", "1", "badFormat, some road, cityButNoCountry")
    ).toDS
    
    val parsedDS = rawDS.map(addressParser)
    
    parsedDS.show                                                                                                                                                                                                                                                            
    +---------+----------+--------------------+------+-------------+----------------+-----------+                                                                                                                                                                                   
    |addressId|customerId|             address|number|         road|            city|    country|                                                                                                                                                                                   
    +---------+----------+--------------------+------+-------------+----------------+-----------+                                                                                                                                                                                   
    |        1|         1|20, my super road...|    20|my super road|   beautifulCity|someCountry|                                                                                                                                                                                   
    |        1|         1|badFormat, some r...|  null|    some road|cityButNoCountry|       null|                                                                                                                                                                                   
    +---------+----------+--------------------+------+-------------+----------------+-----------+
    

    As you see, thanks to the fact that you had already foreseen that parsing can go wrong, it was easily possible to use scala.util.Try to try and get the pieces of that raw address and add some robustness in there (the second line contains some null values where it could not parse the address string.

    Hope this helps!