apache-kafkaapache-nificonfluent-cloud

Connecting from Apache NiFi to Confluent Cloud


We are trying to connect to Confluent Cloud from Apache NiFi using PublishKafka. Getting following error. We are using PublishKafka with ssl setting and Kafka API key and secret to connect to connect to Kafka.

Unexpected error from pkckg.eastus.azure.confluent.cloud/20.23>java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty

org.apache.kafka.common.network.Selector [Consumer clientId=consumer-TestConnection-1, groupId=TestConnection] Unexpected error from pkc-56d1g.eastus.a>java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty

2022-12-09 01:01:32,078 WARN [Timer-Driven Process Thread-7] org.apache.kafka.clients.NetworkClient [Consumer clientId=consumer-TestConnection-1, groupId=TestConnection] Connection to node -1 (xxxx.eastus.azure.confluent.cloud/20.2>2022-12-09 01:01:32,078 WARN [Timer-Driven Process Thread-7] org.apache.kafka.clients.NetworkClient [Consumer clientId=consumer-TestConnection-1, groupId=TestConnection] Bootstrap broker pkc-56d1g.eastus.azure.confluent.cloud:9092 (id: >2022-12-09 01:01:33,055 WARN [kafka-producer-network-thread | producer-4] org.apache.kafka.common.network.Selector [Producer clientId=producer-4] Unexpected error from pkc-56d1g.eastus.azure.confluent.cloud/20.237.15.111; closing connec>java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty at java.base/sun.security.validator.PKIXValidator.(PKIXValidator.java:102) at java.base/sun.security.validator.Validator.getInstance(Validator.java:181) at java.base/sun.security.ssl.X509TrustManagerImpl.getValidator(X509TrustManagerImpl.java:300) at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrustedInit(X509TrustManagerImpl.java:176) at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:246) at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632) at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473) at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369) at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514) at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547) at org.apache.kafka.common.network.Selector.poll(Selector.java:485) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty at java.base/java.security.cert.PKIXParameters.setTrustAnchors(PKIXParameters.java:200) at java.base/java.security.cert.PKIXParameters.(PKIXParameters.java:120) at java.base/java.security.cert.PKIXBuilderParameters.(PKIXBuilderParameters.java:104) at java.base/sun.security.validator.PKIXValidator.(PKIXValidator.java:99) ... 25 common frames omitted 2022-12-09 01:01:33,055 WARN [kafka-producer-network-thread | producer-4] org.apache.kafka.clients.NetworkClient [Producer clientId=producer-4] Connection to node -1 (xxxx.eastus.azure.confluent.cloud/x.x.x.x:9092) terminated> 2022-12-09 01:01:33,055 WARN [kafka-producer-network-thread | producer-4] org.apache.kafka.clients.NetworkClient [Producer clientId=producer-4] Bootstrap broker pkc-56d1g.eastus.azure.confluent.cloud:9092 (id: -1 rack: null) disconnected 2022-12-09 01:01:33,087 WARN [Timer-Driven Process Thread-8] org.apache.kafka.common.network.Selector [Consumer clientId=consumer-TestConnection-1, groupId=TestConnection] Unexpected error from pkc-56d1g.eastus.azure.confluent.cloud/20.>java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty


Solution

  • Below steps would resolve the connectivity issue:

    1. From Confluent CLI Tools, generate API key and secret.
    2. From Apache NiFi, Create new StandardRestrictedSSLContextService or StandardSSLContextService with the CA cert, path and password. If the cert is copied to default path, refer them in the Controller Services. Note: Default pwd would be changeit.
    3. In the PublishKafkaRecord & ConsumeKafkaRecord NiFi processor,

    This should be it for connecting Apache NiFi and Confluent Cloud