apache-flinkkinesis-stream

Apache Flink, KineisStreamSink PKIX path building failed


I have a local Kinesalite stream, which I try to send data from my local Kinesis, built in application mode:

        Properties sinkProperties = new Properties();
        sinkProperties.put(AWSConfigConstants.AWS_REGION, region);
        sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, aws_secret_access_key);
        sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, aws_access_key);
        sinkProperties.put(AWSConfigConstants.AWS_ENDPOINT, aws_endpoint);
        sinkProperties.put(AWSConfigConstants.TRUST_ALL_CERTIFICATES, true);
    
        DataStream<String> fromGen =
                env.fromSequence(1, 10_000_000L)
                        .map(Object::toString)
                        .returns(String.class)
                        .map(data -> data.toString());

        KinesisStreamsSink<String> kdsSink =
                KinesisStreamsSink.<String>builder()
                        .setSerializationSchema(new SimpleStringSchema())
                        .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
                        .setStreamName(outputStreamName)
                        .setMaxBatchSize(20)
                        .setKinesisClientProperties(sinkProperties)
                        .build();

        fromGen.sinkTo(kdsSink);

I get the following error after emitting new data to this stream:

15:48:53,933 WARN  org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline [] - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
org.apache.flink.kinesis.shaded.io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-connector-kinesis-1.15.2.jar:1.15.2]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.ssl.Alert.createSSLException(Alert.java:131) ~[?:?]
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:353) ~[?:?]
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:296) ~[?:?]
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:291) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175) ~[?:?]
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) ~[?:?]
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) ~[?:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) ~[?:?]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1548) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1394) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    ... 16 more
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439) ~[?:?]
    at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) ~[?:?]
    at sun.security.validator.Validator.validate(Validator.java:264) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:276) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175) ~[?:?]
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) ~[?:?]
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) ~[?:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) ~[?:?]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1548) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1394) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    ... 16 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141) ~[?:?]
    at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126) ~[?:?]
    at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297) ~[?:?]
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434) ~[?:?]
    at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) ~[?:?]
    at sun.security.validator.Validator.validate(Validator.java:264) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:276) ~[?:?]
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232) ~[?:?]
    at sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175) ~[?:?]
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392) ~[?:?]
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061) ~[?:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    at sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008) ~[?:?]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1548) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1394) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) ~[flink-connector-kinesis-1.15.2.jar:1.15.2]
    ... 16 more

This is really strange, as I am able to read from local Streams, this problem only appears when I try to put new records to them.

there where hardly any resource around the net how to solve this, I have found this site https://github.com/galgus/flink-connector-http but the workaround did not work for me (the same error happens)


Solution

  • The flink application version differns from the recommended aws flink version, and that older version's sdk to connect to kinesis is bugged. So there is no way to fix this currently, we choose a different approach of handlig data locally