scalaapache-sparkgoogle-bigquerygoogle-cloud-dataproc

Spark The deserializer is not supported: need a(n) "ARRAY" field but got "MAP<STRING, STRING>"


Recently we have been migrated to dataproc image 2.2 version along with supporting scala 2.12.18 and spark 3.5 version.

package test

import org.apache.spark.sql.SparkSession
import test.Model._

object readbigquery   {
  def main(args: Array[String]) = {

    val runLocally = true
    val jobName ="Test Spark Logging Case"

    implicit val spark: SparkSession = Some(SparkSession.builder.appName(jobName))
      .map(sparkSessionBuilder =>
        if (runLocally) sparkSessionBuilder.master("local[2]") else sparkSessionBuilder
      )
      .map(_.getOrCreate())
      .get

    val datasetConfigurationRowsRaw = getDatasetConfigurations(spark,confProjectId="test-project",mappingsDatasetName="MAPPINGS",datasetConfigurationsTableName="dataset_configurations")
    println(s"datasetConfigurationRowsRaw:$datasetConfigurationRowsRaw")

  }

  def getDatasetConfigurations(
                                spark: SparkSession,
                                confProjectId: String,
                                mappingsDatasetName: String,
                                datasetConfigurationsTableName: String,
                              ): Seq[DatasetConfigurationRow] = {

    import org.apache.spark.sql.functions._
    import spark.implicits._
    spark.read
      .format("bigquery")
      .option("table", s"$confProjectId.$mappingsDatasetName.$datasetConfigurationsTableName")
      .option("project", confProjectId)
      .load()
      .select(
        col("technology"),
        col("name"),
        col("identifier_column_names"),
        col("column_mappings"),
        col("timestamp_column_name"))
      .as[DatasetConfigurationRow]
      .collect()
  }
}

package test

object Model {

case class DatasetConfigurationRow
(
  technology: String,
  name: String,
  identifier_column_names: Seq[String],
  column_mappings: Seq[ColumnMapping],
  timestamp_column_name: String,
)


case class ColumnMapping
(
  mapping_type: String,
  source_column_name: String,
  column_name: String,
  name: String,
  display_name: String,
  description: String,
  keep_source_column: Boolean,
  formula: String,
  functions: Functions
)

case class DataUnitFunction(key: String, value: String)

case class Functions
(
  fun_temporal: String,
  fun_regional: String,
  fun_temporal_unit: Seq[DataUnitFunction],
  fun_regional_unit: Seq[DataUnitFunction]
)

}

Above is the snippet of the actual code which was working fine with dataproc image 2.0 and scala 2.12.16 and spark 3.4 version.

Here we are reading bigquery table and trying to load them in set of case classes as they are defined above. Now we are facing below issue

Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "MAP<STRING, STRING>".

I did try with some maps and changing the datatype of case classes but didn't worked out. Looks like it was failing while loading the DataUnitFunction.

I did also give a try reading it into dataframe and then using maps to convert and then load it into the case class. This approach also changing our code a lot given the complexity and schema of our bigquery table.

adding schema of a given bigquery table

CREATE TABLE `test-project.MAPPINGS.dataset_configurations`
(
  technology STRING,
  name STRING,
  identifier_column_names ARRAY<STRING>,
  column_mappings ARRAY<STRUCT<mapping_type STRING, source_column_name STRING, column_name STRING, name STRING, display_name STRING, description STRING, keep_source_column BOOL, formula STRING, functions STRUCT<fun_temporal STRING, fun_regional STRING, fun_temporal_unit ARRAY<STRUCT<key STRING, value STRING>>, fun_regional_unit ARRAY<STRUCT<key STRING, value STRING>>>>>,
  timestamp_column_name STRING
)
;

Solution

  • I had to change the schema of the case class Functions to solve this.

      case class Functions
    (
      fun_temporal: String,
      fun_regional: String,
      fun_temporal_unit: Option[Map[String,String]]=None,
      fun_regional_unit: Option[Map[String,String]]=None
    )
    

    Below is the dataframe schema which is loaded successfully to the set of case classes.

    |-- technology: string (nullable = true)
     |-- name: string (nullable = true)
     |-- identifier_column_names: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- timestamp_column_name: string (nullable = true)
     |-- column_mappings: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- mapping_type: string (nullable = true)
     |    |    |-- source_column_name: string (nullable = true)
     |    |    |-- column_name: string (nullable = true)
     |    |    |-- name: string (nullable = true)
     |    |    |-- display_name: string (nullable = true)
     |    |    |-- description: string (nullable = true)
     |    |    |-- keep_source_column: boolean (nullable = true)
     |    |    |-- formula: string (nullable = true)
     |    |    |-- functions: struct (nullable = true)
     |    |    |    |-- fun_temporal: string (nullable = true)
     |    |    |    |-- fun_regional: string (nullable = true)
     |    |    |    |-- fun_temporal_unit: map (nullable = false)
     |    |    |    |    |-- key: string
     |    |    |    |    |-- value: string (valueContainsNull = true)
     |    |    |    |-- fun_regional_unit: map (nullable = false)
     |    |    |    |    |-- key: string
     |    |    |    |    |-- value: string (valueContainsNull = true)