apache-sparkapache-kafkaspark-streamingspark-avro

How to convert bytes from Kafka to their original object?


I am fetching data from Kafka and then deserialize the Array[Byte] using default decoder, and after that my RDD elements looks like (null,[B@406fa9b2), (null,[B@21a9fe0) but I want my original data which have a schema, so how can I achieve this?

I serialize messages in Avro format.


Solution

  • You have to decode the bytes using proper deserializers, say to strings or your custom object.

    If you don't do the decoding you get [B@406fa9b2 that is simply the text representation of byte arrays in Java.

    Kafka knows nothing about the content of your messages and so it passes byte arrays from producers to consumers.

    In Spark Streaming you have to use serializers for keys and values (quoting KafkaWordCount example):

    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    

    With the above serializers you get DStream[String] so you work with RDD[String].

    If however you want to deserialize byte arrays to your custom class directly you'd have to write a custom Serializer (which is Kafka-specific and has nothing to do with Spark).

    What I'd recommend is to use JSON with a fixed schema or Avro (with a solution described in Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages).


    In Structured Streaming however the pipeline could look as follows:

    val fromKafka = spark.
      readStream.
      format("kafka").
      option("subscribe", "topic1").
      option("kafka.bootstrap.servers", "localhost:9092").
      load.
      select('value cast "string") // <-- conversion here