I'm trying to connect to remove kafka broker as consumer using 0.11.0.3 kafka version via SSL using commandline tool, connection string is the following
kafka-console-consumer.bat \
--bootstrap-server host:port \
--topic topicName \
--from-beginning \
--group groupId \
--consumer.config ssl.properties
ssl.properties
file
security.protocol=SSL
ssl.truststore.location=path/to/truststore.jks
ssl.truststore.password=1234567
ssl.keystore.location=path/to/keystore.jks
ssl.keystore.password=1234567
ssl.key.password=1234567
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.type=JKS
ssl.keystore.type=JKS
Exception I get
[2020-04-28 17:24:39,522] ERROR [Consumer clientId=consumer-1, groupId=binomix] Connection to node -1 (/178.208.149.84:9301) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2020-04-28 17:24:39,524] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1582)
at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:544)
at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1216)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1184)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:471)
at org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:448)
at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:313)
at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:198)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:333)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:325)
at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1689)
at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:226)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1082)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:1015)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:1012)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1520)
at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
... 18 more
Caused by: java.security.cert.CertificateException: No subject alternative names present
at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:145)
at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:459)
at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:440)
at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:284)
at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1676)
... 27 more
Processed a total of 0 messages
Any ideas why it happens? Is it an issue with certificates? I did not generate them, they were given me by kafka server owner. Thanks in advance.
It seems that Subject Alternative Name (SAN) is missing from your brokers' certificates (for example /var/private/ssl/kafka.server.truststore.jks
).
To do so, append the argument -ext SAN=DNS:{FQDN}
to the keytool command:
keytool \
-keystore kafka.server.keystore.jks \
-alias localhost \
-validity {validity} \
-genkey \
-keyalg RSA \
-ext SAN=DNS:{FQDN}
Make sure to inclued SAN when creating servers' keystores. This is also mentioned in the Confluent's Security Tutorial:
If host name verification is enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields:
- Common Name (CN)
- Subject Alternative Name (SAN)
Both fields are valid, however RFC-2818 recommends the use of SAN. SAN is also more flexible, allowing for multiple DNS entries to be declared. Another advantage is that the CN can be set to a more meaningful value for authorization purposes.
Alternatively, you can choose to disable server host verification:
Disable server host name verification by setting
ssl.endpoint.identification.algorithm
to an empty string.
Therefore, you just need to set in server.properties
the following configuration and finally restart your Kafka Cluster:
ssl.endpoint.identification.algorithm=