apache-flinkpyflink

CustomDeserializer not working for accepting string as tuples in pyflink


I have incoming Kafka data which looks like ("field1", "field2"). So, now I am trying to parse this input via Pyflink 1.17.1 using a custom deserializer which looks like the below by referring this link:

from pyflink.common import DeserializationSchema, Types, TypeInformation
from model.exceptions import SystemException

class StringToTupleDeserializationSchema(DeserializationSchema):
    
    def __init__(self):
        super().__init__()

    def deserialize(self, message):
        parts = message.split(',')
        try:
            if len(parts) == 2:
                return (parts[0], parts[1])
        except Exception as e:
            raise SystemException(e)

    def get_produced_type(self):
        return TypeInformation.of((Types.STRING(), Types.STRING()))

Now, instead of SimpleStringSchema() I passed this Class as the input to KafkaSource as below:

source = KafkaSource.builder() \
            .set_bootstrap_servers("localhost:9092") \
            .set_topics("test-topic1") \
            .set_group_id("my-group") \
            .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
            .set_value_only_deserializer(StringToTupleDeserializationSchema()) \
            .build()
ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
       

But this raises an error:

py4j.protocol.Py4JJavaError: An error occurred while calling o25.fromSource.
: java.lang.NullPointerException
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.getProducedType(KafkaValueOnlyDeserializationSchemaWrapper.java:56)
    at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:216)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2643)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:2015)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)

I can get the elements I want using string.strip and string.split but that's not an efficient way to access the elements.

What am I missing here?

TIA


Solution

  • Kafka messages are not string but bytes and they would have to be converted into string at the first place?

    When you check SimpleStringSchema or JsonRowDeserializationSchema, they utilize the relevant JAVA class and I guess those classes converts byte into string internally.

    I'd recommend to deserialize the values with SimpleStringSchema and apply a map function to split it into tuple.

    enter image description here