On a Ubuntu server I set up Divolte Collector to gather clickstream data from websites. The data is being written to a Kafka channel named divolte-data. By setting up a Kafka consumer I can see the data coming in:
V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer
LinuxCanonical Ltd.
Then I would like to visualize the data with Airbnb Superset which has several connectors to common databases including druid.io (which can read Spark).
It seems Divolte is storing data in Kafka in an unstructured way. But apparently it can map the data in a structured way. Should the input data be structured in JSON (like the docs says)?
And then how to read from Druid-Tranquility the data received at divolte-data Kafka channel? I tried changing the channel name in conf examples but this consumer then receives zero message.
The solution I found is I can process Kafka messages in Python, for example with Kafka Python library, or Confluent Kafka Python, and then I will decode the messages with Avro readers.
I thought Avro library was just to read Avro files, but it actually solved the problem of decoding Kafka messages, as follow: I first import the libraries and give the schema file as a parameter and then create a function to decode the message into a dictionary, that I can use in the consumer loop.
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
c = Consumer()
c.subscribe(topic)
running = True
while running:
msg = c.poll()
if not msg.error():
msg_value = msg.value()
event_dict = decode(msg_value)
print(event_dict)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False