python-3.xapache-kafkaapache-flinkzeek

Upgrading Apache Flink Python w/ Kafka Connector from 1.15.4 to 1.16+ causes existing consumer/producers to disconnect


I have a setup running an apache kafka broker via Confluent cp-kafka 7.2, with zookeeper, a logstash consumer, and a custom kafka producer container using librdkafka (https://github.com/apache/metron-bro-plugin-kafka).

When I create a consumer using Apache Flink 1.16 (any apache flink > 1.15.4 actually) and the appropriate flink-sql-connector-kafka Jar file in Python, the logstash consumer and zeek producer are disconnected and do not recover for multiple minutes. If I switch to Python 3.7 and use Flink 1.15.4, the disconnect does not occur. Additionally, while these services are attempting to reconnect, the broker is completely unavailable for CLI queries (e.g. kafka-consume-groups queries).

Kafka logs:

---logstash related---
[2024-04-01 23:58:18,586] INFO [GroupCoordinator 1]: Assignment received from leader logstash-0-4bb31f6d-f4e0-4196-afb9-c3be6d251a77 for group logstash for generation 20. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-04-01 23:58:33,598] INFO [GroupCoordinator 1]: Member logstash-0-4bb31f6d-f4e0-4196-afb9-c3be6d251a77 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
---zeek related---
[2024-04-01 23:58:54,164] ERROR [KafkaApi-1] Unexpected error handling request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-16, correlationId=2) -- InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) with context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-16, correlationId=2), connectionId='172.20.0.7:29092-172.20.0.1:43094-4', clientAddress=/172.20.0.1, principal=User:ANONYMOUS, listenerName=ListenerName(EXTERNAL), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=unknown), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@4c7701d4]) (kafka.server.KafkaApis)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for next producer ID block
[2024-04-01 23:58:54,193] INFO [BrokerToControllerChannelManager broker=1 name=forwarding] Disconnecting from node 1 due to request timeout. (org.apache.kafka.clients.NetworkClient)
[2024-04-01 23:58:54,194] INFO [BrokerToControllerChannelManager broker=1 name=forwarding] Cancelled in-flight API_VERSIONS request with correlation id 1 due to node 1 being disconnected (elapsed time since creation: 30029ms, elapsed time since send: 30029ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient)
[2024-04-01 23:58:54,195] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use broker nids-kafka-cntr:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)

Zeek logs:

[disconnected broker msgs]
%4|1712015304.258|FAIL|rdkafka#producer-7| [thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration) (after 10009ms in state APIVERSION_QUERY, 3 identical error(s) suppressed)
%3|1712015304.258|ERROR|rdkafka#producer-7| [thrd:localhost:29092/bootstrap]: 1/1 brokers are down
%4|1712015304.258|REQTMOUT|rdkafka#producer-7| [thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests

Logstash Logs:

[2024-04-01T23:58:31,613][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main][b13590b3df4211af5265ef2746ad3101d589af4a46691ce4ad66a9091cfd09cb] [Consumer clientId=logstash-0, groupId=logstash] Group coordinator nids-kafka-cntr:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
[2024-04-01T23:59:04,649][INFO ][org.apache.kafka.clients.FetchSessionHandler][main][b13590b3df4211af5265ef2746ad3101d589af4a46691ce4ad66a9091cfd09cb] [Consumer clientId=logstash-0, groupId=logstash] Error sending fetch request (sessionId=430766704, epoch=14) to node 1: {}.
org.apache.kafka.common.errors.DisconnectException: null

Any idea what the issue could be? Thank you!

Tried: Python 7, 9, 10 with Apache Flink 1.15.4 thru 1.18

Everything works on Python 7 with Flink 1.15.4. Fails on newer versions of Python / flink.


Solution

  • Upgrade to confluent Cp-Kafka and Cp-Zookeeper 7.6.0 and the error is resolved.