pythonapache-sparkapache-kafkaspark-kafka-integration

Get 2 different data from 1 kafka topic into 2 dataframes


I have a homework like this:

  1. Use python to read json files in 2 folders song_data and log_data.
  2. Use Python Kafka to publish a mixture of both song_data and log_data file types into a Kafka topic.
  3. Use Pyspark to consume data from the above Kafka topic.
  4. Use Stream processing to consume messages from song_data and create 2 dataframes, songs and artitst. and from log_data generate dataframe as users, time.
  5. Create songplays from dataframes of dimension tables.

I have a problems with read different file from 1 topic, 2 folder containt json file but 1 is song data and 1 is log. How can I get their own data from just 1 topics ?


Solution

  • Unclear why you cannot just use two topics, one for each file. Especially if they don't have matching schemas, which will be important for SparkSQL.

    How can I get their own data from just 1 topics ?

    It begins at step 2.

    Write the data to your single topic in a format like so (content used for example purposes only)

    {"type": "song", "content": "..."}
    

    or

    {"type": "log", "content": "..."}
    

    Then, in SparkSQL, you can do something like this

    df = spark.readStream.format("kafka")... # TODO: apply a schema to the data to get a "type" column
    song_data = df.where(df("type") == "song").select("content")
    log_data = df.where(df("type") == "log").select("content")
    

    You could also do the same filtering in Python-Kafka and not need dataframes or a Spark environment.