rapache-sparkdatabrickssparkr

SparkR: creating a Spark table with a nested data column


I'm working in Databricks trying to push data from an R notebook to Apache Spark where one column is nested data. Here's a working example without the nesting:

library(SparkR)
sparkR.session()

d1 = data.frame(id = 1:3, name = c('x', 'y', 'z'))

# temp view
SparkR::dropTempView('temp1') # drop if it already exists
SparkR::createOrReplaceTempView(SparkR::as.DataFrame(d1), 'temp1')

my_schema1 = structType(structField("id", "double"), structField("name", "string"))

SparkR::createTable('hive_metastore.my_project.test1', schema = my_schema1)

# append data to spark table
SparkR::sql('INSERT INTO hive_metastore.my_project.test1 TABLE temp1;')

# test
SparkR::sql('SELECT * FROM hive_metastore.my_project.test1') |> SparkR::showDF()
+---+----+
| id|name|
+---+----+
|1.0|   x|
|2.0|   y|
|3.0|   z|
+---+----+

Now a nested data example:

# 2 functions to generate equivalent nested data structures
sub_fn1 = function(x) data.frame(key = base::sample(LETTERS, x), val = rnorm(x))
sub_fn2 = function(x) purrr::map2(base::sample(LETTERS, x), rnorm(x), ~ list(key = .x, val = .y))

d2 = dplyr::tibble(
  id = 1:3, name = c('x', 'y', 'z'),
  data1 = purrr::map(c(3, 5, 4), sub_fn1),
  data2 = purrr::map(c(3, 5, 4), sub_fn2)
) |> as.data.frame()

dplyr::glimpse(d2)
Rows: 3
Columns: 4
$ id    <int> 1, 2, 3
$ name  <chr> "x", "y", "z"
$ data1 <list> [<data.frame[3 x 2]>], [<data.frame[5 x 2]>], [<data.frame[4 x 2…
$ data2 <list> [["I", 0.6562561], ["N", -0.5147073], ["M", -0.4036189]], [["M",…

I'm unable to create a valid schema to reflect either of these datax fields, so cannot specify the Spark table to append to. For example:

my_schema2 = structType(
  structField("id", "double"), 
  structField("name", "string"), 
  structField("data2", "array")
)
Error in checkType(type) : Unsupported type for SparkDataframe: array

Are these examples of nesting supported? Very grateful for assistance to figure out how to get past "INSERT INTO" step with the example nested dataset.


Solution

  • I too would like to know if this is supported although my testing suggests that it is not directly/easily supported.

    An alternative approach would be to translate the nested data to some other format such as raw or JSON before storage and then back translate upon retrieval.

    Here is an example using JSON:

    sub_fn1 = function(x) data.frame(key = base::sample(LETTERS, x), val = rnorm(x))
    sub_fn2 = function(x) purrr::map2(base::sample(LETTERS, x), rnorm(x), ~ list(key = .x, val = .y))
    
    # data1, data2 as data.frame
    d2 = dplyr::tibble(
      id = 1:3, 
      name = c('x', 'y', 'z'),
      data1 = purrr::map(c(3, 5, 4), sub_fn1),
      data2 = purrr::map(c(3, 5, 4), sub_fn2)
    )
    
    # translate data1, data2 to json for storage
    d2 <- d2 |>
      dplyr::mutate(
        data1 = purrr::map_chr(data1, jsonlite::toJSON),
        data2 = purrr::map_chr(data2, jsonlite::toJSON)
      )
    
    SparkR::dropTempView('tmp_v_1')
    SparkR::createOrReplaceTempView(SparkR::as.DataFrame(d2), 'tmp_v_1')
    
    my_schema2 = SparkR::structType(
      SparkR::structField("id", "double"), 
      SparkR::structField("name", "string"), 
      SparkR::structField("data1", "string"),
      SparkR::structField("data2", "string")
    )
    
    SparkR::sql("DROP TABLE IF EXISTS x.y.z;")
    SparkR::createTable('x.y.z', schema = my_schema2)
    
    SparkR::sql('INSERT INTO x.y.z TABLE tmp_v_1;')
    
    # back translate
    SparkR::sql('SELECT * FROM x.y.z') |>
    SparkR::collect() |>
    dplyr::mutate(
      data1 = purrr::map(data1, \(x) x |> jsonlite::fromJSON() |> as.data.frame()),
      data2 = purrr::map(data2, \(x) x |> jsonlite::fromJSON() |> as.data.frame())
    ) |>
    dplyr::as_tibble()
    #> # A tibble: 3 × 4
    #>      id name  data1        data2       
    #>   <dbl> <chr> <list>       <list>      
    #> 1     1 x     <df [3 × 2]> <df [3 × 2]>
    #> 2     2 y     <df [5 × 2]> <df [5 × 2]>
    #> 3     3 z     <df [4 × 2]> <df [4 × 2]>