I am working on a user-defined Flink MQTT connector.
https://github.com/yinjilong/StoneForests-flink-mqtt-connector
However, I encounter a serialization problem when I try to write messages in json
format.
In the class public class MqttSinkFunction<T> extends RichSinkFunction<T>
I override the invoke
method as follows,
@Override
public void invoke(T event, Context context) {
if (log.isDebugEnabled()) {
log.debug("sink invoke...");
log.debug("message is {}", event);
}
try {
byte[] payload = this.serializer.serialize(event);
} catch (Exception e){
log.error("can not serialize event{} at {}",event,e);
}
try{
byte[] payload = this.serializer.serialize(event);
MqttMessage message = new MqttMessage(payload);
message.setQos(this.qos);
String[] topics = this.topics.split(",");
for (String topic : topics) {
if (log.isDebugEnabled()) {
log.debug("send message:[{}] to topic topic:[{}]. Exception.", message, topic);
}
this.client.publish(topic, message);
}
} catch(Exception e){
log.error("Cannot sink MQTT event {} at {}",event ,hostUrl,e);
}
}
I uses the following SQL to write a message,
$ sql-client.sh
CREATE TABLE sink(
id INT,
name STRING
) WITH(
'connector' = 'mqtt',
'hostUrl' = 'tcp://localhost:1883',
'username' = '',
'password' = '',
'sinkTopics' = 'test/mytopic',
'format' = 'json'
);
INSERT INTO sink (id,name) VALUES(1,'Jeen');
It seems that the serialization from event(RowData)
is failing.
The event is shown as
+I(1,Jeen)
I thought the internal serialization could infer the JSON schema from given SQL table schema automatically when I use the JSON format. but it fails and throws an exception.
What can I try next?
NPE causes the failure of serialization from event(RowData)
, because org.apache.flink.formats.json.JsonRowDataSerializationSchema#mapper
is null in your com.nakata.flink.connectors.mqtt.table.MqttSinkFunction#serializer
instance.
you can fix it by adding a line to com.nakata.flink.connectors.mqtt.table.MqttSinkFunction#open
function.
this.serializer.open(null);