dockergoogle-cloud-dataflowapache-beamapache-beam-kafkaio

Fetch Truststore File Inside a Flex Template image for Confluent Kafka


We are trying to store the truststore.jks file inside the Flex Template Docker but while using it in the pipeline we are unable to locate it.

we tried pulling the image and we can see the file is present in the docker at \tmp\trust.jks but while using the same we get the following error.

Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:67)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:134)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 42 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:154)
... 46 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:155)
... 49 more
Caused by: java.nio.file.NoSuchFileException: /tmp/trust.jks
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
java.base/java.nio.file.Files.newByteChannel(Files.java:371)
java.base/java.nio.file.Files.newByteChannel(Files.java:422)
java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
java.base/java.nio.file.Files.newInputStream(Files.java:156)
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
... 50 more

Here is the docker image that we used

FROM gcr.io/dataflow-templates-base/python38-template-launcher-base
RUN apt-get update && \
    apt-get install -y openjdk-11-jdk

ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64


ARG WORKDIR=/dataflow/python/using_flex_template_adv3
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY . .
COPY trust.jks /tmp/trust.jks

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
ENV PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
RUN apt-get update \
    && apt-get install -y libffi-dev git \
    && rm -rf /var/lib/apt/lists/* \
    # Upgrade pip and install the requirements.
    && pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r $PYTHON_REQUIREMENTS_FILE \
    # Download the requirements to speed up launching the Dataflow job.
    && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $PYTHON_REQUIREMENTS_FILE

RUN pip install --no-cache-dir apache-beam[gcp]==2.43.0
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

Code Used

read = p_input | "Read Json from Kafka" >> ReadFromKafka(
    consumer_config=self.consumer_config,
    topics=[self.topic],
    commit_offset_in_finalize=False,
    with_metadata=False
)

consumer_config contents:

bootstrap.servers: bs:9095
group.id: "Group1"
ssl.truststore.location: '/tmp/trust.jks'
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_SSL
sasl.jaas.config: ""

We are trying to run this pipeline on GCP Dataflow with apache beam python sdk.

EDIT

Turns out the Worker VM and the Launcher VM are different and Worker VM won't have the same environment as Launcher.

To solve this I had to use sdk_container_image which are used to configure custom container images for our worker vm you can find the steps here


Solution

  • Can you share the command you use to start the dataflow job?

    From the Dockerfile, it looks like you are copying the file into the template launcher instead of the worker.

    Template launcher is responsible for starting a flex-template job, whereas workers are responsible for executing the work items. It looks like you want the file to be present in the worker instead of the template launcher.

    It is also clear from the error that the file is not present in the worker,

    Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    

    Please follow the process specified here to use custom containers on the worker - https://cloud.google.com/dataflow/docs/guides/using-custom-containers)

    Note: Custom containers are only supported for pipelines using Dataflow Runner v2.