javakubernetesapache-kafkaenvoyproxy

Start consuming kafka messages after Envoy Sidecar becomes ready


I have an application running is Prod in Kubernetes, and during the deployment I get bunch of errors.

15:12:03.495 [xx-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1925)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1348)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition xxxxxxxx-15 at offset xxxxx. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331)
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:283)
    at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168)
    at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134)
    at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145)
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:666)
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617)
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1625)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1600)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1405)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296)
    ... 2 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 19222
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:345)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:53)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:62)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:321)
    ... 14 common frames omitted
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.Net.pollConnect(Native Method)
    at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
    at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:547)
    at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:602)
    at java.base/java.net.Socket.connect(Socket.java:639)
    at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:178)
    at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:533)
    at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:638)
    at java.base/sun.net.www.http.HttpClient.<init>(HttpClient.java:281)
    at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:386)
    at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:408)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1312)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1245)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1131)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1060)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1690)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1614)
    at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:281)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:840)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:813)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:294)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:417)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:342)
    ... 19 common frames omitted

I checked the envoy sidecar logs, Kafka consumers start consuming messages even before envoy sidecar is ready and since consumer is dependent on kafka schema registry, kafka fails to deserialise the payload to desired schema.

Is there a way we can delay message consumption until envoy is ready? Here we can not utilise readiness probe from kubernetes since kafka consumers start consuming messages earlier.

Can someone share their experience what could be done in this situation that effectively solves the problem.


Solution

  • I'm wondering if a "startupProbe" for your Kafka consumer could solve your problem.

    startupProbe:
    exec:
      command:
        - sh
        - '-c'
        - >
          command_to_check_envoy_is_ready