protocol-buffersprotobuf-python

Protobuf: ParseFromString method gives error "raise message_mod.DecodeError('Field number 0 is illegal.')" for message with oneof field


I have a kafka topic that has protobuf message of format:

message CreditTransaction {
    string date = 1;
    float amount = 2;
}

message DebitTransaction {
    string date = 1;
    float amount = 2;
}
...
.. # other message definitions

message TransactionEvent {
    oneof event {
        CreditTransaction credit = 1;
        DebitTransaction debit = 2;
        Trade trade = 3;
        ....
        ..# other fields
    }
};

Using pyspark-streaming, when I am trying to use ParseFromString method to parse it, its giving me error:

 File "./google.zip/google/protobuf/message.py", line 202, in ParseFromString
    return self.MergeFromString(serialized)
  File "./google.zip/google/protobuf/internal/python_message.py", line 1128, in MergeFromString
    if self._InternalParse(serialized, 0, length) != length:
  File "./google.zip/google/protobuf/internal/python_message.py", line 1178, in InternalParse
    raise message_mod.DecodeError('Field number 0 is illegal.')
google.protobuf.message.DecodeError: Field number 0 is illegal.

Is it because the message TransactionEvent has only a single field and that too oneof type ?
I tried to add a dummy int64 id field also

message TransactionEvent {
    int64 id = 1;
    oneof event {
        CreditTransaction credit = 2;
        DebitTransaction debit = 3;
        Trade trade = 4;
        ....
        ..# other fields
    }
};

but still the same error.
Code I am using:

def parse_protobuf_from_bytes(msg_bytes):
    msg = schema_pb2.MarketDataEvent()
    msg.ParseFromString(msg_bytes)
    eventStr = msg.WhichOneof("event")
    if eventStr=="credit":
        # some code
        
    elif eventStr=="debit":
        # some code
        
    return str(concatenatedFieldsValue)

parse_protobuf = udf(lambda x: parse_protobuf_from_bytes(x), StringType())

kafka_conf = {
    "kafka.bootstrap.servers": "kafka.broker.com:9092",
    "checkpointLocation": "/user/aiman/checkpoint/kafka_local/transactions",
    "subscribe": "TRANSACTIONS",
    "startingOffsets": "earliest",
    "enable.auto.commit": False,
    "value.deserializer": "ByteArrayDeserializer",
    "group.id": "my-group"
}


df = spark.readStream \
            .format("kafka") \
            .options(**kafka_conf) \
            .load()

data = df.selectExpr("offset","CAST(key AS STRING)", "value") \
         .withColumn("event", parse_protobuf(col("value")))


df2 = data.select(col("offset"),col("event"))

If I am just printing the bytes without parsing, I am getting this:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|offset |event                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|7777777|bytearray(b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_')                                                                                                                                                                                      |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Solution

  • The raw data you are trying to decode is:

    b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_'

    A valid protobuf message never starts with 0x00, because the field number 0 is reserved. The message appears to have some extra data in the beginning.

    Comparing with the protobuf encoding specification, we can try to make sense of this.

    Starting from the string RIOT_230120P35.00, it is correctly prefixed by length 0x11 (17 characters). The previous byte is 0x0A, which is tag for field 1 with type string, such as in CreditTransaction message. Reading the message backwards from there, everything looks reasonable up to 0x1A byte.

    After stripping the first 7 bytes and converting to hex (1a 5d 0a 11 52 49 4f 54 5f 32 33 30 31 32 30 50 33 35 2e 30 30 10 80 a6 ae 82 d9 ed f1 fe 16 18 cd d9 d9 82 d9 ed f1 fe 16 20 e2 f7 d9 82 d9 ed f1 fe 16 28 95 a2 ed ff d9 ed f1 fe 16 30 8c aa ed ff d9 ed f1 fe 16 38 80 d1 b6 c1 0b 40 c0 8d a3 ba 0b 48 19 50 04 5a 02 51 5f 62 02 41 5f), the message is accepted by online protobuf decoder.

    It seems the message has 7 extra bytes in the beginning for some reason. These bytes do not conform to the protobuf format and their meaning cannot be determined without some information from the developer of the other endpoint of the communication.