I have a kafka topic that has protobuf message of format:
message CreditTransaction {
string date = 1;
float amount = 2;
}
message DebitTransaction {
string date = 1;
float amount = 2;
}
...
.. # other message definitions
message TransactionEvent {
oneof event {
CreditTransaction credit = 1;
DebitTransaction debit = 2;
Trade trade = 3;
....
..# other fields
}
};
Using pyspark-streaming, when I am trying to use ParseFromString
method to parse it, its giving me error:
File "./google.zip/google/protobuf/message.py", line 202, in ParseFromString
return self.MergeFromString(serialized)
File "./google.zip/google/protobuf/internal/python_message.py", line 1128, in MergeFromString
if self._InternalParse(serialized, 0, length) != length:
File "./google.zip/google/protobuf/internal/python_message.py", line 1178, in InternalParse
raise message_mod.DecodeError('Field number 0 is illegal.')
google.protobuf.message.DecodeError: Field number 0 is illegal.
Is it because the message TransactionEvent has only a single field and that too oneof
type ?
I tried to add a dummy int64 id field also
message TransactionEvent {
int64 id = 1;
oneof event {
CreditTransaction credit = 2;
DebitTransaction debit = 3;
Trade trade = 4;
....
..# other fields
}
};
but still the same error.
Code I am using:
def parse_protobuf_from_bytes(msg_bytes):
msg = schema_pb2.MarketDataEvent()
msg.ParseFromString(msg_bytes)
eventStr = msg.WhichOneof("event")
if eventStr=="credit":
# some code
elif eventStr=="debit":
# some code
return str(concatenatedFieldsValue)
parse_protobuf = udf(lambda x: parse_protobuf_from_bytes(x), StringType())
kafka_conf = {
"kafka.bootstrap.servers": "kafka.broker.com:9092",
"checkpointLocation": "/user/aiman/checkpoint/kafka_local/transactions",
"subscribe": "TRANSACTIONS",
"startingOffsets": "earliest",
"enable.auto.commit": False,
"value.deserializer": "ByteArrayDeserializer",
"group.id": "my-group"
}
df = spark.readStream \
.format("kafka") \
.options(**kafka_conf) \
.load()
data = df.selectExpr("offset","CAST(key AS STRING)", "value") \
.withColumn("event", parse_protobuf(col("value")))
df2 = data.select(col("offset"),col("event"))
If I am just printing the bytes without parsing, I am getting this:
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|offset |event |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|7777777|bytearray(b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_') |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The raw data you are trying to decode is:
b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_'
A valid protobuf message never starts with 0x00, because the field number 0 is reserved. The message appears to have some extra data in the beginning.
Comparing with the protobuf encoding specification, we can try to make sense of this.
Starting from the string RIOT_230120P35.00
, it is correctly prefixed by length 0x11 (17 characters). The previous byte is 0x0A, which is tag for field 1 with type string, such as in CreditTransaction
message. Reading the message backwards from there, everything looks reasonable up to 0x1A byte.
After stripping the first 7 bytes and converting to hex (1a 5d 0a 11 52 49 4f 54 5f 32 33 30 31 32 30 50 33 35 2e 30 30 10 80 a6 ae 82 d9 ed f1 fe 16 18 cd d9 d9 82 d9 ed f1 fe 16 20 e2 f7 d9 82 d9 ed f1 fe 16 28 95 a2 ed ff d9 ed f1 fe 16 30 8c aa ed ff d9 ed f1 fe 16 38 80 d1 b6 c1 0b 40 c0 8d a3 ba 0b 48 19 50 04 5a 02 51 5f 62 02 41 5f
), the message is accepted by online protobuf decoder.
It seems the message has 7 extra bytes in the beginning for some reason. These bytes do not conform to the protobuf format and their meaning cannot be determined without some information from the developer of the other endpoint of the communication.