apache-kafkagoogle-cloud-dataflowapache-beamconfluent-schema-registry

Unable to connect from Dataflow job to Schema Registry when Schema Registry requires TLS client authentication


I am developing a GCP Cloud Dataflow job that use Kafka broker and Schema Registry. Our Kafka broker and Schema Registry requires TLS client certificate. And I am facing connection issue with Schema Registry on deployment. Any suggestion is highly welcomed.

Here is what I do for the Dataflow job. I create Consumer Properties for TLS configurations.

props.put("security.protocol", "SSL");
props.put("ssl.truststore.password", "aaa");
props.put("ssl.keystore.password", "bbb");
props.put("ssl.key.password", "ccc"));
props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);

And update Consumer Properties by updateConsumerProperties.

Pipeline p = Pipeline.create(options)
...
.updateConsumerProperties(properties)
... 

As this stackoverflow answer suggests, I also download keyStore and trustStore to local directory and specify trustStore / keyStore location on ConsumerProperties in ConsumerFactory.

Truststore and Google Cloud Dataflow

Pipeline p = Pipeline.create(options)
 ...
 .withConsumerFactoryFn(new MyConsumerFactory(...))
 ...

In ConsumerFactory:

public Consumer<byte[], byte[]> apply(Map<String, Object> config)  {
  // download keyStore and trustStore from GCS bucket 
  config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
  config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
  new KafkaConsumer<byte[], byte[]>(config);
}

With this code I succeeded in deployment but the Dataflow job got TLS server certificate verification error.

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
        sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
        sun.security.validator.Validator.validate(Validator.java:260)
        sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
        sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
        java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
        io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
        io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
        io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
        io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
        io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
        io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
        org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
        org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

Then I found that Schema Registry client load TLS configurations from system property. https://github.com/confluentinc/schema-registry/issues/943

I tested Kafka Consumer with the same configuration, and I confirmed it works fine.

props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);
props.put("ssl.truststore.location", System.getProperty("javax.net.ssl.trustStore"));
props.put("ssl.truststore.password", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.location", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.password", System.getProperty("javax.net.ssl.keyStorePassword"));
props.put("ssl.key.password", System.getProperty("javax.net.ssl.key.password"));

Next I applied the same approach, which means apply the same TLS configurations to system properties and Consumer Properties, to Dataflow job code.

I specified password by system properties when executing application.

-Djavax.net.ssl.keyStorePassword=aaa \
-Djavax.net.ssl.key.password=bbb \
-Djavax.net.ssl.trustStorePassword=ccc \

Note: I set system property for trustStore and keyStore location in Consumer Factory since those files are downloaded to local temp directory.

config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
System.setProperty("javax.net.ssl.trustStore", localTrustStoreFilePath)
System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)

but even deployment was failed with timeout error.

Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
        at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
...
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: java.lang.IllegalArgumentException: DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions
        at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
Caused by: java.lang.IllegalArgumentException: Error constructing default value for gcpTempLocation: tempLocation is not a valid GCS path, gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. 
        at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
...
Caused by: java.lang.RuntimeException: Unable to verify that GCS bucket gs://dev-k8s-rfid-store-dataflow exists.
        at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
...
Caused by: java.io.IOException: Error getting access token for service account: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
...
Caused by: java.net.SocketException: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
...
Caused by: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at java.security.Provider$Service.newInstance(Provider.java:1617)
...
Caused by: java.io.IOException: Keystore was tampered with, or password was incorrect
    at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:780)
Caused by: java.security.UnrecoverableKeyException: Password verification failed
    at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:778)

Am I missing something?


Solution

  • I got reply from GCP support. It seems that Apache Beam does not support Schema Registry.

    Hello, the Dataflow specialist has reached me back. I will now expose what they have told me.

    The answer to your question is no, Apache Beam does not support Schema Registry. However, they have told me that you could implement the calls to Schema Registry by yourself as Beam only consumes raw messages and it is user's responsibility to do whatever they need with the data.

    This is based on our understanding of the case that you want to publish messages to Kafka, and have DF consume those messages, parsing them based on the schema from the registry.

    I hope this information can be useful to you, let me know if I can be of further help.

    But Dataflow job can still receive Avro format binary message. So you internally call Schema Registry REST API as follows. https://stackoverflow.com/a/55917157