springserializationapache-kafkaspring-integrationspring-xd

spring-integration-kafka 1.3.1.RELEASE : Any workaround to use org.apache.kafka.common.serialization?


I'm using Spring XD to play with this issue:

Is there any workaround to deserialize messages that I'm receiving from a Kafka topic (0.9 version).

I have been trying to modify the dependencies to keep spring-integration-kafka 1.3.1 and using latest apache dependencies for serialization doing something like this

        <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
            <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.2.0.RELEASE</version>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
        <scope>provided</scope>
    </dependency>

When I run my stream (asuming a kafka source | log) when it is being deployed I'm getting this:

16:49:43,171 WARN DeploymentsPathChildrenCache-0 utils.VerifiableProperties - Property key.deserializer is not valid 16:49:43,172 WARN DeploymentsPathChildrenCache-0 utils.VerifiableProperties - Property value.deserializer is not valid

And obviously:

2017-05-10T16:50:43-0400 1.3.0.RELEASE INFO task-scheduler-8 sink.probando_deserializer - {probando_topic={0=[[B@6fbfdb37]}}

Solution

  • The Spring XD uses an old version of Spring Integration Kafka (1.x), which only supports the 0.8.x.x kafka client.

    Spring Integration Kafka 2.x supports 0.9.x.x -> 0.10.2.x; it's based on the spring-kafka project.

    You would need to create a custom source based on the newer Spring Integration module.

    The 2.1.0.RELEASE should work with spring-kafka 1.2.x and the 0.10.2.x client.