Run a SpringBoot 3.2.4 reactor kafka producer with graalVM 21 native image.
A simple SpringBoot reactor kafka producer project is working fine as non native image. The image has been built many times, running with low, high load without any issue.
Now, I am trying to convert the job into a native image.
I expect the same process to also build, run fine.
While the build of the native image is happening without issue, at runtime, it seems there is an issue specific to graalVM native image.
The error stack trace is:
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:459)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
at reactor.kafka.sender.internals.ProducerFactory.createProducer(ProducerFactory.java:34)
at reactor.kafka.sender.internals.DefaultKafkaSender.lambda$new$2(DefaultKafkaSender.java:102)
at reactor.core.publisher.MonoCallable.call(MonoCallable.java:72)
at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
at reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143)
at reactor.core.publisher.Flux.subscribe(Flux.java:8825)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8961)
at reactor.core.publisher.Flux.subscribe(Flux.java:8805)
at reactor.core.publisher.Flux.subscribe(Flux.java:8729)
at reactor.core.publisher.Flux.subscribe(Flux.java:8647)
at com.Service.run(Service.java:51)
at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:790)
at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:83)
at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60)
at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:88)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789)
at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:774)
at java.base@21.0.2/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base@21.0.2/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357)
at java.base@21.0.2/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:510)
at java.base@21.0.2/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base@21.0.2/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base@21.0.2/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base@21.0.2/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base@21.0.2/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:774)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:341)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
at com.Application.main(Application.java:18)
at java.base@21.0.2/java.lang.invoke.LambdaForm$DMH/sa346b79c.invokeStaticInit(LambdaForm$DMH)
Caused by: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for io.confluent.kafka.serializers.KafkaJsonSerializer
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:399)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:395)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:385)
... 35 common frames omitted
Caused by: java.lang.NoSuchMethodException: io.confluent.kafka.serializers.KafkaJsonSerializer.<init>()
at java.base@21.0.2/java.lang.Class.checkMethod(DynamicHub.java:1075)
at java.base@21.0.2/java.lang.Class.getConstructor0(DynamicHub.java:1238)
at java.base@21.0.2/java.lang.Class.getDeclaredConstructor(DynamicHub.java:2930)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:397)
... 39 common frames omitted
Here is the snippet for the configuration:
@Bean
public KafkaSender<String, Log> kafkaSender(final MeterRegistry meterRegistry, final ObservationRegistry observationRegistry) {
final Map<String, Object> properties = new HashMap<>();
properties.put(SSL_PROTOCOL, SSL_VALUE);
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation);
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassphrase);
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassphrase);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaJsonSerializer.class);
final SenderOptions<String, Log> senderOptions = SenderOptions.create(properties);
return KafkaSender.create(senderOptions
.producerListener(new MicrometerProducerListener(meterRegistry))
.withObservation(observationRegistry));
}
It seems one needs to manipulate registrars, some config.json to fix graalVM related issue.
What is needed in order to fix this issue?
Kafka uses reflection to lookup various classes from the properties, such as serializers.
You'll need to define a config file to enable certain reflection calls such as to get constructors
[
{
"name": "io.confluent.kafka.serializers.KafkaJsonSerializer",
"allDeclaredConstructors": true
}
]