I have incoming Kafka data which looks like ("field1", "field2")
. So, now I am trying to parse this input via Pyflink 1.17.1
using a custom deserializer
which looks like the below by referring this link:
from pyflink.common import DeserializationSchema, Types, TypeInformation
from model.exceptions import SystemException
class StringToTupleDeserializationSchema(DeserializationSchema):
def __init__(self):
super().__init__()
def deserialize(self, message):
parts = message.split(',')
try:
if len(parts) == 2:
return (parts[0], parts[1])
except Exception as e:
raise SystemException(e)
def get_produced_type(self):
return TypeInformation.of((Types.STRING(), Types.STRING()))
Now, instead of SimpleStringSchema()
I passed this Class as the input to KafkaSource as below:
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("test-topic1") \
.set_group_id("my-group") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(StringToTupleDeserializationSchema()) \
.build()
ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
But this raises an error:
py4j.protocol.Py4JJavaError: An error occurred while calling o25.fromSource.
: java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.getProducedType(KafkaValueOnlyDeserializationSchemaWrapper.java:56)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:216)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2643)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:2015)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
I can get the elements I want using string.strip
and string.split
but that's not an efficient way to access the elements.
What am I missing here?
TIA
Kafka messages are not string but bytes and they would have to be converted into string at the first place?
When you check SimpleStringSchema
or JsonRowDeserializationSchema
, they utilize the relevant JAVA class and I guess those classes converts byte into string internally.
I'd recommend to deserialize the values with SimpleStringSchema
and apply a map function to split it into tuple.