
How can i send my structured streaming dataframe to kafka?

Hello everyone !

I'm trying to send my structured streaming dataframe to one of my kafka topics, detection.

This is the schema of the structued streaming dataframe:

 |-- timestamp: timestamp (nullable = true)
 |-- Sigma: string (nullable = true)
 |-- time: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- SourceComputer: string (nullable = true)
 |-- SourcePort: string (nullable = true)
 |-- DestinationComputer: string (nullable = true)
 |-- DestinationPort: string (nullable = false)
 |-- protocol: string (nullable = true)
 |-- packetCount: string (nullable = true)
 |-- byteCount: string (nullable = true)

but then i try to send the dataframe, with this method:

dfwriter=df \
  .selectExpr("CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("checkpointLocation", "/Documents/checkpoint/logs") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("failOnDataLoss", "false") \
  .option("topic", detection) \

Then i got the error :

pyspark.sql.utils.AnalysisException: cannot resolve 'value' given input columns: [DestinationComputer, DestinationPort, Sigma, SourceComputer, SourcePort, byteCount, duration, packetCount, processName, protocol, time, timestamp]; line 1 pos 5;

If i send a dataframe with juste the column value it works, i receive the data on my kafka topic consumer.

Any idea to send my dataframe with all columns ?

Thank you !


  • Your dataframe has no value column, as the error says.

    You'd need to "embed" all columns under a value StructType column, then use a function like to_json, not CAST( .. AS STRING)

    In Pyspark, that'd be something like struct(to_json(struct($"*")).as("value") within a select query

    Similar question - Convert all the columns of a spark dataframe into a json format and then include the json formatted data as a column in another/parent dataframe