jsonscalaapache-sparkspark-streaming-kafka

Array of JSON to Dataframe in Spark received by Kafka


I'm writing a Spark application in Scala using Spark Structured Streaming that receive some data formatted in JSON style from Kafka. This application could receive both a single or multiple JSON object formatted in this way:

[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]

I tried to define a StructType like:

var schema = StructType(
                  Array(
                        StructField("key1",DataTypes.StringType),
                        StructField("key2",DataTypes.StringType)
             ))

But it doesn't work. My actual code for parsing JSON:

var data = (this.stream).getStreamer().load()
  .selectExpr("CAST (value AS STRING) as json")
  .select(from_json($"json",schema=schema).as("data"))

I would like to get this JSON objects in a dataframe like

+----------+---------+
|      key1|     key2|
+----------+---------+
|    value1|   value2|
|    value1|   value2|
        ........
|    value1|   value2|
+----------+---------+

Anyone can help me please? Thank you!


Solution

  • As your incoming string is Array of JSON, one way is to write a UDF to parse the Array, then explode the parsed Array. Below is the complete code with each steps explained. I have written it for batch but same can be used for streaming with minimal changes.

    object JsonParser{
    
      //case class to parse the incoming JSON String
      case class JSON(key1: String, key2: String)
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.
          builder().
          appName("JSON").
          master("local").
          getOrCreate()
    
        import spark.implicits._
        import org.apache.spark.sql.functions._
    
        //sample JSON array String coming from kafka
        val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
    
        //UDF to parse JSON array String
        val jsonConverter = udf { jsonString: String =>
          val mapper = new ObjectMapper()
          mapper.registerModule(DefaultScalaModule)
          mapper.readValue(jsonString, classOf[Array[JSON]])
        }
    
        val df = str.toDF("json") //json String column
          .withColumn("array", jsonConverter($"json")) //parse the JSON Array
          .withColumn("json", explode($"array")) //explode the Array
          .drop("array") //drop unwanted columns
          .select("json.*") //explode the JSON to separate columns
    
        //display the DF
        df.show()
        //+------+------+
        //|  key1|  key2|
        //+------+------+
        //|value1|value2|
        //|value3|value4|
        //+------+------+
    
      }
    }