So I setup kafka on wsl2 and was able to produce and consume messages to/from a topic I created there in the wsl2 instance. After that I used the kafka connector provided in wso2 (micro-integrator deployed on my local windows system) to send a json message to the topic and was successful. However when I tried to read the messages following this documentation, I was unable to do so. The micro-integrator would keep showing errors implying that it was unable to connect to the topic and consume the messages as shown below:
My EP is configured as follows:
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" name="kafkaListenerInboundEp" onError="kafkaListenerSequence" sequence="kafkaListenerSequence" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="topic.name">demo-test</parameter>
<parameter name="group.id">test-consumer-group</parameter>
<parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="poll.timeout">1000</parameter>
<parameter name="contentType">application/json</parameter>
<parameter name="interval">1000</parameter>
<parameter name="class">org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
<parameter name="sequential">true</parameter>
<parameter name="coordination">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
</parameters>
</inboundEndpoint>
Any help would be appreciated!
Turns our for some reason the latest inbound endpoint connector does not work with the version of micro-integrator that I was using(4.0.0), so I installed 4.2.0 and it worked perfectly on the first try.