pythonpysparkprotocol-buffersprotobuf-python

PySpark-Streaming protobuf message gives error: AttributeError: 'bytearray' object has no attribute 'WhichOneof'


I am reading protobuf messages from kafka topic using py-spark streaming.
While using the msg.WhichOneOf gives error AttributeError: 'bytearray' object has no attribute 'WhichOneof'.
The schema protofile is:

message InstrumentStatusUpdate {
    ....
    ..
}
...
..
message MarketDataEvent {
    oneof event {
        InstrumentStatusUpdate instrumentStatusUpdate = 1;
        Trade trade = 2;
    }
};

Py-spark code I'm using:

def parse_options_monitor_msg(msg:schema_pb2.MarketDataEvent):
    eventStr = msg.WhichOneof("event")
    if eventStr == "trade":
        trade_list = []
        trade_msg = msg.trade
        symbol = trade_msg.symbol
        ticker = symbol.split('_')[0]
        trade_list = [symbol,ticker]
        return str(trade_list)

parse_options_monitor = udf(lambda x: parse_options_monitor_msg(x), StringType())

df = spark.readStream \
            .format("kafka") \
            .options(**kafka_conf) \
            .load()
data = df.selectExpr("offset", "value") \
         .withColumn("event", parse_options_monitor(col("value")))

df2 = data.select(col("offset"),col("event"))

df2.writeStream \
            .format("console") \
            .outputMode("append") \
            .option("truncate", False) \
            .start() \
            .awaitTermination()

How to get rid of this error ?


Solution

  • I added the ParseFromString() method.

    def parse_options_monitor_msg(msg_bytes:schema_pb2.MarketDataEvent):
        msg = schema_pb2.MarketDataEvent()
        msg.ParseFromString(msg_bytes)
        eventStr = msg.WhichOneof("event")
        if eventStr == "trade":
            trade_list = []
            trade_msg = msg.trade
            symbol = trade_msg.symbol
            ticker = symbol.split('_')[0]
            trade_list = [symbol,ticker]
            return str(trade_list)