scalaapache-sparkapache-spark-sqlorientdbapache-spark-dataset

Convert a Spark Dataset column from a UDT to Array<String>


I'm using the Spark OrientDB connector to retrieve some data that looks like the following:

character title
Tony Stark ["Iron Man"]
James Buchanan Barnes ["Captain America: The First Avenger","Captain America: The Winter Soldier","Captain America: Civil War","Avengers: Infinity War"]
Marcus Bledsoe ["Captain America: The Winter Soldier"]

The Dataframe returns this as [character: string, title: embeddedlist]. An EmbeddedList is a UDT defined here

I would like to treat the title as an Array<String> so that I can do the following:

    val vertices = df
      .select(explode(concat(array('character), 'title)) as "x")
      .distinct.rdd.map(_.getAs[String](0))
      .zipWithIndex.map(_.swap)

I'm not sure how to cast/convert the EmbeddedList correctly. Running this as-is results in the error: cannot resolve 'concat(array(name), out)' due to data type mismatch: input to function concat should have been string, binary or array, but it's [array<string>, array<string>]

Any help/pointers are appreciated.

Edit: The way I'm receiving the data is in this structure:

    val df: DataFrame = Seq(
      "Tony Stark" -> EmbeddedList(Array("Iron Man")),
      "James Buchanan Barnes" -> EmbeddedList(Array("Captain America: The First Avenger", "Captain America: The Winter Soldier", "Captain America: Civil War", "Avengers: Infinity War")),
      "Marcus Bledsoe" -> EmbeddedList(Array("Captain America: The Winter Soldier"))
    ).toDF("character", "title")

Solution

  • I was able to solve this by casting the EmbeddedList to a string and then splitting on the comma. I have to believe there's a more elegant way to do this but this works for now at least.

        val vertices = df
          .withColumn("title", col("title").cast("String"))
          .withColumn("title", split(col("title"), ", "))
          .select(explode(concat(array('character), 'title)) as "x")
          .distinct.rdd.map(_.getAs[String](0))
          .zipWithIndex.map(_.swap)