pythonapache-kafkaapache-flinkpyflink

Unable to publish data to Kafka Topic using pyflink 1.17.1


I am trying to publish the data which was originally a list but I converted it to the string and then tried to push it to the Kafka topic as per this official documentation I tried the below code:

sink = KafkaSink.builder() \
            .set_bootstrap_servers("localhost:9092") \
            .set_record_serializer(
            KafkaRecordSerializationSchema.builder()
            .set_topic("test-topic45")
            .set_value_serialization_schema(SimpleStringSchema())
            .build()
        ) \
            .build()
datastream.sink_to(sink)

but it threw the below error:

Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')

I even tried setting up the key serializer to SimpleStringSchema(which I don't think was needed) but same result.

Also, I don't need to convert explicitly since SimpleStringSchema will handle it for me also I have ensured my upstream layers are working fine. Absolutely no problem with that.

The upstream layer to this is the process function which returns a list of tuples of tuples and I haven't mentioned the output_type parameter for the process function. Should I mention it or will it be handled by SimpleStringSchema?

Is there anything else I am missing here?

Any hints are appreciated.


Solution

  • Yes, you're right. The error indicates the stream record doesn't match to what's been specified in the Kafka sink builder.

    You would find JSON to be more useful than STRING and the Flink document has an example about how to serialise Kafka key and value with JSON - see link

    Below shows a quick working example. Note you should have the connector JAR file in (./jars/flink-sql-connector-kafka-1.17.1.jar).

    import os
    
    from pyflink.common import Row
    from pyflink.common.typeinfo import Types
    from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, TimeCharacteristic
    from pyflink.datastream.connectors.kafka import (
        KafkaSink,
        KafkaRecordSerializationSchema,
        DeliveryGuarantee,
    )
    from pyflink.datastream.formats.json import JsonRowSerializationSchema
    
    BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:29092")
    
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
    SRC_DIR = os.path.dirname(os.path.realpath(__file__))
    jar_files = ["flink-sql-connector-kafka-1.17.1.jar"]
    jar_paths = tuple([f"file://{os.path.join(SRC_DIR, 'jars', name)}" for name in jar_files])
    print(jar_paths)
    env.add_jars(*jar_paths)
    
    value_type_info = Types.ROW_NAMED(
        field_names=["data"],
        field_types=[Types.STRING()],
    )
    
    source_stream = env.from_collection(
        collection=[
            [
                (("user1", "gold"), ("user2", "gold"), ("user5", "gold")),
                (("user3", "gold"), ("user4", "gold"), ("user6", "gold")),
            ]
        ]
    )
    
    
    sink = (
        KafkaSink.builder()
        .set_bootstrap_servers(BOOTSTRAP_SERVERS)
        .set_record_serializer(
            KafkaRecordSerializationSchema.builder()
            .set_topic("sink-demo")
            .set_value_serialization_schema(
                JsonRowSerializationSchema.builder().with_type_info(value_type_info).build()
            )
            .build()
        )
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build()
    )
    
    # source_stream.map(lambda e: Row(data=str(e)), output_type=value_type_info).print()
    source_stream.map(lambda e: Row(data=str(e)), output_type=value_type_info).sink_to(sink).name(
        "sink-demo"
    ).uid("sink-demo")
    
    env.execute("Kafka sink example.")
    

    If successful, you should be able see the following Kafka message.

    enter image description here

    Also the message value can be parsed as shown below. Note it only works with Python and wouldn't be recommended if the messages are shared by other data types.

    enter image description here