javaakka-streamalpakkareactive-kafkaakka-kafka

Incompatible equality constraint while using Akka Kafka Streams


I am trying to use Akka Kafka Streams following the Akka Kafka Streams documentation. Here is the code I have:

ConsumerSettings<byte[], ETLProcessMessage> consumerSettings = ConsumerSettings
                .create(actorSystem, new ByteArrayDeserializer(), new KafkaJacksonSerializer<>(ETLProcessMessage.class))
                .withBootstrapServers(kafkaServers)
                .withGroupId(consumerGroupId)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
                .mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> {
                    handlePartitionedRequest(msg.record().value());
                    return Done.getInstance();
                }))
                .runWith(Sink.ignore(), materializer);

But the above code shows compiler error at runwith(): enter image description here

Here is the code for KafkaJacksonSerializer:

import com.adaequare.mapro.common.exception.AppException;
import com.adaequare.mapro.config.jackson.PostConstructDeserializer;
import com.adaequare.mapro.model.transformer.JSONTransformer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.io.CharStreams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;

public class KafkaJacksonSerializer<T> implements Serializer<T>, Deserializer<T>{    
    private ObjectReader objectReader;
    private ObjectWriter objectWriter;
    private ObjectMapper objectMapper;

    public KafkaJacksonSerializer(){   
    }

    public KafkaJacksonSerializer(Class<T> persistentClass) {
        objectMapper = new ObjectMapper();

        SimpleModule module = new SimpleModule();
        module.setDeserializerModifier(new BeanDeserializerModifier() {
            @Override
            public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config,
                                                          BeanDescription beanDesc, final JsonDeserializer<?> deserializer) {
                return new PostConstructDeserializer(deserializer);
            }
        });
        objectMapper.registerModule(module);

        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
                .withFieldVisibility(JsonAutoDetect.Visibility.ANY)
                .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withSetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withCreatorVisibility(JsonAutoDetect.Visibility.NONE));

        objectReader = objectMapper.readerFor(persistentClass);
        objectWriter = objectMapper.writer();
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        InputStream stream = new ByteArrayInputStream(data);
        if(stream == null){
            return null;
        }

        try {
            String json = CharStreams.toString(new InputStreamReader(stream));
            return objectReader.readValue(json);
        } catch (IOException e) {
            throw AppException.forException("Error while unmarshalling AssetData JSON: "+e.getMessage(), e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if(data == null){
            return null;
        }

        try {
            return objectWriter.writeValueAsBytes(data);
        } catch (IOException e) {
            throw AppException.forException("Error while marshalling JSON: "+e.getMessage(), e);
        }
    }

    @Override
    public void close() {
    }
}

I am not sure what exactly is the problem. But the below code does not show any error:

ConsumerSettings newconsumerSettings = ConsumerSettings
                .create(actorSystem, new ByteArrayDeserializer(), new StringDeserializer())
                .withBootstrapServers(kafkaServers)
                .withGroupId(consumerGroupId)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer.committableSource(newconsumerSettings, Subscriptions.topics("topic2"))
                .mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> Done.getInstance()))
                .runWith(Sink.ignore(), materializer);

Can someone please help me identify what is going wrong here?


Solution

  • There is akka version mismatch between the dependencies added. Once I corrected them to be the same, I am no more seeing the compilation error.

    Here are the dependencies I was using:

    compile 'com.typesafe.akka:akka-actor_2.12:2.5.4'
    compile 'com.typesafe.akka:akka-cluster_2.12:2.5.4'
    compile 'com.typesafe.akka:akka-cluster-tools_2.12:2.5.4'
    compile 'com.typesafe.akka:akka-slf4j_2.12:2.5.4'
    

    This is what I have added newly for reactive kafka:

    compile 'com.typesafe.akka:akka-stream-kafka_2.12:0.21'
    

    After upgrading akka(actor/cluster related) dependencies to 2.5.9, compilation error is gone.