I am developing a GCP Cloud Dataflow job that use Kafka broker and Schema Registry. Our Kafka broker and Schema Registry requires TLS client certificate. And I am facing connection issue with Schema Registry on deployment. Any suggestion is highly welcomed.
Here is what I do for the Dataflow job. I create Consumer Properties for TLS configurations.
props.put("security.protocol", "SSL");
props.put("ssl.truststore.password", "aaa");
props.put("ssl.keystore.password", "bbb");
props.put("ssl.key.password", "ccc"));
props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);
And update Consumer Properties by updateConsumerProperties.
Pipeline p = Pipeline.create(options)
...
.updateConsumerProperties(properties)
...
As this stackoverflow answer suggests, I also download keyStore and trustStore to local directory and specify trustStore / keyStore location on ConsumerProperties in ConsumerFactory.
Truststore and Google Cloud Dataflow
Pipeline p = Pipeline.create(options)
...
.withConsumerFactoryFn(new MyConsumerFactory(...))
...
In ConsumerFactory:
public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
// download keyStore and trustStore from GCS bucket
config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
new KafkaConsumer<byte[], byte[]>(config);
}
With this code I succeeded in deployment but the Dataflow job got TLS server certificate verification error.
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
sun.security.validator.Validator.validate(Validator.java:260)
sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Then I found that Schema Registry client load TLS configurations from system property. https://github.com/confluentinc/schema-registry/issues/943
I tested Kafka Consumer with the same configuration, and I confirmed it works fine.
props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);
props.put("ssl.truststore.location", System.getProperty("javax.net.ssl.trustStore"));
props.put("ssl.truststore.password", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.location", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.password", System.getProperty("javax.net.ssl.keyStorePassword"));
props.put("ssl.key.password", System.getProperty("javax.net.ssl.key.password"));
Next I applied the same approach, which means apply the same TLS configurations to system properties and Consumer Properties, to Dataflow job code.
I specified password by system properties when executing application.
-Djavax.net.ssl.keyStorePassword=aaa \
-Djavax.net.ssl.key.password=bbb \
-Djavax.net.ssl.trustStorePassword=ccc \
Note: I set system property for trustStore and keyStore location in Consumer Factory since those files are downloaded to local temp directory.
config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
System.setProperty("javax.net.ssl.trustStore", localTrustStoreFilePath)
System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)
but even deployment was failed with timeout error.
Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
...
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: java.lang.IllegalArgumentException: DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions
at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
Caused by: java.lang.IllegalArgumentException: Error constructing default value for gcpTempLocation: tempLocation is not a valid GCS path, gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp.
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
...
Caused by: java.lang.RuntimeException: Unable to verify that GCS bucket gs://dev-k8s-rfid-store-dataflow exists.
at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
...
Caused by: java.io.IOException: Error getting access token for service account: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
...
Caused by: java.net.SocketException: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
at javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
...
Caused by: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
at java.security.Provider$Service.newInstance(Provider.java:1617)
...
Caused by: java.io.IOException: Keystore was tampered with, or password was incorrect
at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:780)
Caused by: java.security.UnrecoverableKeyException: Password verification failed
at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:778)
Am I missing something?
I got reply from GCP support. It seems that Apache Beam does not support Schema Registry.
Hello, the Dataflow specialist has reached me back. I will now expose what they have told me.
The answer to your question is no, Apache Beam does not support Schema Registry. However, they have told me that you could implement the calls to Schema Registry by yourself as Beam only consumes raw messages and it is user's responsibility to do whatever they need with the data.
This is based on our understanding of the case that you want to publish messages to Kafka, and have DF consume those messages, parsing them based on the schema from the registry.
I hope this information can be useful to you, let me know if I can be of further help.
But Dataflow job can still receive Avro format binary message. So you internally call Schema Registry REST API as follows. https://stackoverflow.com/a/55917157