I am trying to publish the data which was originally a list
but I converted it to the string
and then tried to push it to the Kafka
topic as per this official documentation I tried the below code:
sink = KafkaSink.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("test-topic45")
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.build()
datastream.sink_to(sink)
but it threw the below error:
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')
I even tried setting up the key
serializer to SimpleStringSchema
(which I don't think was needed) but same result.
Also, I don't need to convert explicitly since SimpleStringSchema
will handle it for me also I have ensured my upstream layers are working fine. Absolutely no problem with that.
The upstream layer to this is the process function
which returns a list of tuples of tuples
and I haven't mentioned the output_type
parameter for the process function
. Should I mention it or will it be handled by SimpleStringSchema
?
Is there anything else I am missing here?
Any hints are appreciated.
Yes, you're right. The error indicates the stream record doesn't match to what's been specified in the Kafka sink builder.
You would find JSON to be more useful than STRING and the Flink document has an example about how to serialise Kafka key and value with JSON - see link
Below shows a quick working example. Note you should have the connector JAR file in (./jars/flink-sql-connector-kafka-1.17.1.jar
).
import os
from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, TimeCharacteristic
from pyflink.datastream.connectors.kafka import (
KafkaSink,
KafkaRecordSerializationSchema,
DeliveryGuarantee,
)
from pyflink.datastream.formats.json import JsonRowSerializationSchema
BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:29092")
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
SRC_DIR = os.path.dirname(os.path.realpath(__file__))
jar_files = ["flink-sql-connector-kafka-1.17.1.jar"]
jar_paths = tuple([f"file://{os.path.join(SRC_DIR, 'jars', name)}" for name in jar_files])
print(jar_paths)
env.add_jars(*jar_paths)
value_type_info = Types.ROW_NAMED(
field_names=["data"],
field_types=[Types.STRING()],
)
source_stream = env.from_collection(
collection=[
[
(("user1", "gold"), ("user2", "gold"), ("user5", "gold")),
(("user3", "gold"), ("user4", "gold"), ("user6", "gold")),
]
]
)
sink = (
KafkaSink.builder()
.set_bootstrap_servers(BOOTSTRAP_SERVERS)
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("sink-demo")
.set_value_serialization_schema(
JsonRowSerializationSchema.builder().with_type_info(value_type_info).build()
)
.build()
)
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
)
# source_stream.map(lambda e: Row(data=str(e)), output_type=value_type_info).print()
source_stream.map(lambda e: Row(data=str(e)), output_type=value_type_info).sink_to(sink).name(
"sink-demo"
).uid("sink-demo")
env.execute("Kafka sink example.")
If successful, you should be able see the following Kafka message.
Also the message value can be parsed as shown below. Note it only works with Python and wouldn't be recommended if the messages are shared by other data types.