I am reading protobuf messages from kafka topic using py-spark streaming.
While using the msg.WhichOneOf
gives error AttributeError: 'bytearray' object has no attribute 'WhichOneof'.
The schema protofile is:
message InstrumentStatusUpdate {
....
..
}
...
..
message MarketDataEvent {
oneof event {
InstrumentStatusUpdate instrumentStatusUpdate = 1;
Trade trade = 2;
}
};
Py-spark code I'm using:
def parse_options_monitor_msg(msg:schema_pb2.MarketDataEvent):
eventStr = msg.WhichOneof("event")
if eventStr == "trade":
trade_list = []
trade_msg = msg.trade
symbol = trade_msg.symbol
ticker = symbol.split('_')[0]
trade_list = [symbol,ticker]
return str(trade_list)
parse_options_monitor = udf(lambda x: parse_options_monitor_msg(x), StringType())
df = spark.readStream \
.format("kafka") \
.options(**kafka_conf) \
.load()
data = df.selectExpr("offset", "value") \
.withColumn("event", parse_options_monitor(col("value")))
df2 = data.select(col("offset"),col("event"))
df2.writeStream \
.format("console") \
.outputMode("append") \
.option("truncate", False) \
.start() \
.awaitTermination()
How to get rid of this error ?
I added the ParseFromString()
method.
def parse_options_monitor_msg(msg_bytes:schema_pb2.MarketDataEvent):
msg = schema_pb2.MarketDataEvent()
msg.ParseFromString(msg_bytes)
eventStr = msg.WhichOneof("event")
if eventStr == "trade":
trade_list = []
trade_msg = msg.trade
symbol = trade_msg.symbol
ticker = symbol.split('_')[0]
trade_list = [symbol,ticker]
return str(trade_list)