Recently we have been migrated to dataproc image 2.2
version along with supporting scala 2.12.18
and spark 3.5 version
.
package test
import org.apache.spark.sql.SparkSession
import test.Model._
object readbigquery {
def main(args: Array[String]) = {
val runLocally = true
val jobName ="Test Spark Logging Case"
implicit val spark: SparkSession = Some(SparkSession.builder.appName(jobName))
.map(sparkSessionBuilder =>
if (runLocally) sparkSessionBuilder.master("local[2]") else sparkSessionBuilder
)
.map(_.getOrCreate())
.get
val datasetConfigurationRowsRaw = getDatasetConfigurations(spark,confProjectId="test-project",mappingsDatasetName="MAPPINGS",datasetConfigurationsTableName="dataset_configurations")
println(s"datasetConfigurationRowsRaw:$datasetConfigurationRowsRaw")
}
def getDatasetConfigurations(
spark: SparkSession,
confProjectId: String,
mappingsDatasetName: String,
datasetConfigurationsTableName: String,
): Seq[DatasetConfigurationRow] = {
import org.apache.spark.sql.functions._
import spark.implicits._
spark.read
.format("bigquery")
.option("table", s"$confProjectId.$mappingsDatasetName.$datasetConfigurationsTableName")
.option("project", confProjectId)
.load()
.select(
col("technology"),
col("name"),
col("identifier_column_names"),
col("column_mappings"),
col("timestamp_column_name"))
.as[DatasetConfigurationRow]
.collect()
}
}
package test
object Model {
case class DatasetConfigurationRow
(
technology: String,
name: String,
identifier_column_names: Seq[String],
column_mappings: Seq[ColumnMapping],
timestamp_column_name: String,
)
case class ColumnMapping
(
mapping_type: String,
source_column_name: String,
column_name: String,
name: String,
display_name: String,
description: String,
keep_source_column: Boolean,
formula: String,
functions: Functions
)
case class DataUnitFunction(key: String, value: String)
case class Functions
(
fun_temporal: String,
fun_regional: String,
fun_temporal_unit: Seq[DataUnitFunction],
fun_regional_unit: Seq[DataUnitFunction]
)
}
Above is the snippet of the actual code which was working fine with dataproc image 2.0
and scala 2.12.16
and spark 3.4 version
.
Here we are reading bigquery table and trying to load them in set of case classes as they are defined above. Now we are facing below issue
Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "MAP<STRING, STRING>".
I did try with some maps and changing the datatype of case classes but didn't worked out. Looks like it was failing while loading the DataUnitFunction
.
I did also give a try reading it into dataframe and then using maps to convert and then load it into the case class. This approach also changing our code a lot given the complexity and schema of our bigquery table.
adding schema of a given bigquery
table
CREATE TABLE `test-project.MAPPINGS.dataset_configurations`
(
technology STRING,
name STRING,
identifier_column_names ARRAY<STRING>,
column_mappings ARRAY<STRUCT<mapping_type STRING, source_column_name STRING, column_name STRING, name STRING, display_name STRING, description STRING, keep_source_column BOOL, formula STRING, functions STRUCT<fun_temporal STRING, fun_regional STRING, fun_temporal_unit ARRAY<STRUCT<key STRING, value STRING>>, fun_regional_unit ARRAY<STRUCT<key STRING, value STRING>>>>>,
timestamp_column_name STRING
)
;
I had to change the schema of the case class Functions
to solve this.
case class Functions
(
fun_temporal: String,
fun_regional: String,
fun_temporal_unit: Option[Map[String,String]]=None,
fun_regional_unit: Option[Map[String,String]]=None
)
Below is the dataframe schema which is loaded successfully to the set of case classes.
|-- technology: string (nullable = true)
|-- name: string (nullable = true)
|-- identifier_column_names: array (nullable = true)
| |-- element: string (containsNull = true)
|-- timestamp_column_name: string (nullable = true)
|-- column_mappings: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- mapping_type: string (nullable = true)
| | |-- source_column_name: string (nullable = true)
| | |-- column_name: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- display_name: string (nullable = true)
| | |-- description: string (nullable = true)
| | |-- keep_source_column: boolean (nullable = true)
| | |-- formula: string (nullable = true)
| | |-- functions: struct (nullable = true)
| | | |-- fun_temporal: string (nullable = true)
| | | |-- fun_regional: string (nullable = true)
| | | |-- fun_temporal_unit: map (nullable = false)
| | | | |-- key: string
| | | | |-- value: string (valueContainsNull = true)
| | | |-- fun_regional_unit: map (nullable = false)
| | | | |-- key: string
| | | | |-- value: string (valueContainsNull = true)