I am sending compressed data into Event Hub to overcome 1 MB hard limit in Azure Event Hub. I also have to read this in Py-spark and update delta table.
The compressed data send to Event hub is coming as null in Py-spark stream. How to read it?
This is how I am reading from Event Hub
df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))
This is how I am sending data to Event Hub . event_data_batch = await producer.create_batch()
# Add events to the batch.
body = '{"id": "90", "firstName": "Sudarshan70","middleName": "Kumar2","lastName": "Thakur2"}'
# Compress the JSON string using the gzip algorithm.
compressed_body = gzip.compress(body.encode('utf-8'))
# Encode the compressed JSON string to base64.
encoded_compressed_body = base64.b64encode(compressed_body)
event_data_batch.add(EventData(encoded_compressed_body))
I tried to read with option as gzip but its giving me null .
df_stream = spark.readStream.format("eventhubs")\
.options(**ehConf)\
.option("compression", "gzip") \
.load()
And this is how I read body column
def foreach_batch_function(df_stream, epoch_id):
# temp_view_name = "stream_temp"
df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))
df_stream_body.createOrReplaceTempView("stream_temp")
# df_stream_body.printSchema()
You need to do decompress on the column body
, the option .option("compression", "gzip")
is for compressed files not on compressed data on the column.
So, need to create user defined function which decompress and decodes the data. Use below code.
UDF
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import gzip,base64,json
def unZip(binary_string):
return gzip.decompress(base64.b64decode(binary_string)).decode('utf-8')
unzip = F.udf(unZip, StringType())
Next, create new column with decompressed data.
df.withColumn("bd", unzip(F.col("body").cast('string'))).display()
Output: