apache-sparkspark-bigquery-connector

How to write to a JSON Column in BigQuery using Spark Scala


I have a dataframe which contains a JSON String as a StringType Column. How can you write this to BigQuery JSON Column (New Feature)? Spark JSON Write to BigQuery with examples is not available https://github.com/GoogleCloudDataproc/spark-bigquery-connector

When we try to write the string in append Mode to a JSON Column we get an error

Field source has changed type from JSON to STRING

According to Readme -> Datatypes -> JSON

How this can be implemented with an example


Solution

  • Spark BigQuery Connector usage for JSON is mentioned in their Github

    Spark has no JSON type. The values are read as String. In order to write JSON back to BigQuery, the following conditions are REQUIRED Use the INDIRECT write method Use the AVRO intermediate format The DataFrame field MUST be of type String and has an entry of sqlType=JSON in its metadata

    I had raised an issue for clarification which can be found here #880 and #882

    In the below Scala Spark example, I have a table, that contains two JSON columns metadata & data. Here we first create a table_schema metadata and then we assign the schema metadata to the Spark RDD as follows. Further, we can write directly to the JSON columns in BigQuery using Scala Spark

    import org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType}
    
        val table_schema = new StructType()
          .add(StructField(metadata, StringType, true, Metadata.fromJson({sqlTypeJSON})))
          .add(StructField(data, StringType, true, Metadata.fromJson({sqlTypeJSON})))
    
         Create a BigQuery Dataframe from the RDD and Schema above
        val df_bigquery = spark.createDataFrame(rowRDD, table_schema)
    
         Write to BigQuery using Spark BigQuery Connector
         For JSON Column ingestion, writeMethod should be indirect with a temporaryGcsBucket and intermediateFormat as avro
         Target Table Details are provided in spark.datasource.bigquery.full_load_table
         This has two modes, where spark.datasource.bigquery.write.mode can be either overwrite or append
        df_bigquery.write
          .format(bigquery)
          .option(temporaryGcsBucket, 'temp-bucket-name'
          .option(writeMethod, indirect)
          .option(intermediateFormat, avro)
          .mode(append)
          .option(table, 'schema.table')
          .option(createDisposition, CREATE_IF_NEEDED)
          .option(writeDisposition, WRITE_TRUNCATE)
          .save()
      }