I'm using on a PySpark Structured Streaming in my application where I use readStream
to read appended data from an Apache Iceberg table. After processing the data within the PySpark framework, I want to send the processed data to a WebSocket client using the websockets library in Python.
I tried using .foreach()
, but I could not use await inside it
I found a solution:
async def process(df, df_id, websocket):
# data processing
await websocket.send(data)
def process_wrapper(batch_df, batch_id, websocket):
asyncio.run(process(batch_df, batch_id, websocket))
query = df \
.writeStream \
.outputMode("append") \
.foreachBatch(partial(process_wrapper, websocket=websocket)) \
.trigger(processingTime="10 seconds") \
.start()