apache-kafkawso2windows-subsystem-for-linuxwso2-esbconnector

WSO2 Kafka Consume messages from topic issue


So I setup kafka on wsl2 and was able to produce and consume messages to/from a topic I created there in the wsl2 instance. After that I used the kafka connector provided in wso2 (micro-integrator deployed on my local windows system) to send a json message to the topic and was successful. However when I tried to read the messages following this documentation, I was unable to do so. The micro-integrator would keep showing errors implying that it was unable to connect to the topic and consume the messages as shown below: ss of the mi logs

My EP is configured as follows:

 <?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" name="kafkaListenerInboundEp" onError="kafkaListenerSequence" sequence="kafkaListenerSequence" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="bootstrap.servers">localhost:9092</parameter>
        <parameter name="topic.name">demo-test</parameter>
        <parameter name="group.id">test-consumer-group</parameter>
        <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
        <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
        <parameter name="poll.timeout">1000</parameter>
        <parameter name="contentType">application/json</parameter>
        <parameter name="interval">1000</parameter>
        <parameter name="class">org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="inbound.behavior">polling</parameter>
    </parameters>
</inboundEndpoint>

Any help would be appreciated!


Solution

  • Turns our for some reason the latest inbound endpoint connector does not work with the version of micro-integrator that I was using(4.0.0), so I installed 4.2.0 and it worked perfectly on the first try.