I am trying to use Akka Kafka Streams following the Akka Kafka Streams documentation. Here is the code I have:
ConsumerSettings<byte[], ETLProcessMessage> consumerSettings = ConsumerSettings
.create(actorSystem, new ByteArrayDeserializer(), new KafkaJacksonSerializer<>(ETLProcessMessage.class))
.withBootstrapServers(kafkaServers)
.withGroupId(consumerGroupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
.mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> {
handlePartitionedRequest(msg.record().value());
return Done.getInstance();
}))
.runWith(Sink.ignore(), materializer);
But the above code shows compiler error at runwith():
Here is the code for KafkaJacksonSerializer:
import com.adaequare.mapro.common.exception.AppException;
import com.adaequare.mapro.config.jackson.PostConstructDeserializer;
import com.adaequare.mapro.model.transformer.JSONTransformer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.io.CharStreams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
public class KafkaJacksonSerializer<T> implements Serializer<T>, Deserializer<T>{
private ObjectReader objectReader;
private ObjectWriter objectWriter;
private ObjectMapper objectMapper;
public KafkaJacksonSerializer(){
}
public KafkaJacksonSerializer(Class<T> persistentClass) {
objectMapper = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.setDeserializerModifier(new BeanDeserializerModifier() {
@Override
public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config,
BeanDescription beanDesc, final JsonDeserializer<?> deserializer) {
return new PostConstructDeserializer(deserializer);
}
});
objectMapper.registerModule(module);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
objectReader = objectMapper.readerFor(persistentClass);
objectWriter = objectMapper.writer();
}
@Override
public T deserialize(String topic, byte[] data) {
InputStream stream = new ByteArrayInputStream(data);
if(stream == null){
return null;
}
try {
String json = CharStreams.toString(new InputStreamReader(stream));
return objectReader.readValue(json);
} catch (IOException e) {
throw AppException.forException("Error while unmarshalling AssetData JSON: "+e.getMessage(), e);
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, T data) {
if(data == null){
return null;
}
try {
return objectWriter.writeValueAsBytes(data);
} catch (IOException e) {
throw AppException.forException("Error while marshalling JSON: "+e.getMessage(), e);
}
}
@Override
public void close() {
}
}
I am not sure what exactly is the problem. But the below code does not show any error:
ConsumerSettings newconsumerSettings = ConsumerSettings
.create(actorSystem, new ByteArrayDeserializer(), new StringDeserializer())
.withBootstrapServers(kafkaServers)
.withGroupId(consumerGroupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer.committableSource(newconsumerSettings, Subscriptions.topics("topic2"))
.mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> Done.getInstance()))
.runWith(Sink.ignore(), materializer);
Can someone please help me identify what is going wrong here?
There is akka version mismatch between the dependencies added. Once I corrected them to be the same, I am no more seeing the compilation error.
Here are the dependencies I was using:
compile 'com.typesafe.akka:akka-actor_2.12:2.5.4'
compile 'com.typesafe.akka:akka-cluster_2.12:2.5.4'
compile 'com.typesafe.akka:akka-cluster-tools_2.12:2.5.4'
compile 'com.typesafe.akka:akka-slf4j_2.12:2.5.4'
This is what I have added newly for reactive kafka:
compile 'com.typesafe.akka:akka-stream-kafka_2.12:0.21'
After upgrading akka(actor/cluster related) dependencies to 2.5.9, compilation error is gone.