I have a Kafka-enabled Azure Event Hub that I'm trying to connect to from Google Cloud's Dataflow service to stream the data into Google Big Query. I successfully can use the Kafka CLI to talk to the Azure Event Hub. However, with GCP, after 5 minutes, I get timeout errors in the GCP Dataflow job window.
Azure EH w/ Kafka enabled -> GCP Dataflow -> GCP Big Query table
To set up the Kafka-enabled Event Hub, I followed the details on this GitHub page. It has the developer add a jaas.conf
and client_common.properties
. The jaas.conf
includes a reference to the login module along with a username/password. The username for Event Hubs with Kafka is $ConnectionString
. The password is the connection string copied from the CLI. The client_common.properties
contains two flags: security.protocol=SASL_SSL
and sasl.mechanism=PLAIN
. By configuring these files, I'm able to send and receive data using the Kafka CLI tools and the Azure Event Hub. I can see the data streaming from the producer to the consumer through the Azure Event Hub.
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
(echo -n "1|"; cat message.json | jq . -c) | kafka-conle-producer.sh --topic test-event-hub --broker-list test-eh-namespace.servicebus.windows.net:9093 --producer.config client_common.properties --property "parse.key=true" --property "key.separator=|"
kafka-console-consumer.sh --topic test-event-hub --bootstrap-server test-eh-namespace.servicebus.windows.net:9093 --consumer.config client_common.properties --property "print.key=true"
# prints: 1 { "transaction_time": "2020-07-20 15:14:54", "first_name": "Joe", "last_name": "Smith" }
I modified the Google's Data Flow template for Kafka -> Big Query. There was already a configuration map specified for the reseting of the offset. I added additional configuration to match the Azure Event Hubs with Kafka tutorial. While not best practice, I add the connection string to the password field to test. When I upload it to the GCP Data Flow engine and run the job, I get timeout errors every 5 minutes in the log and nothing ends up in Google Big Query.
gcloud dataflow jobs run kafka-test --gcs-location=<removed> --region=us-east1 --worker-zone=us-east4-a --parameters bootstrapServers=test-eh-namespace.servicebus.servicebus.windows.net:9093,inputTopic=test-event-hub,outputTableSpec=project:Kafka_Test.test --service-account-email my-service-account.iam.gserviceaccount.com
# these errors show up in the worker logs
Operation ongoing in step ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds for at least 05m00s without outputting or completing in state process at java.lang.Thread.sleep(Native Method) at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:45) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1481) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.updatedSpecWithAssignedPartitions(KafkaUnboundedSource.java:85) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:125) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReader.iterator(WorkerCustomSources.java:433) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:186) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:163) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1105) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Execution of work for computation 'S4' on key '0000000000000001' failed with uncaught exception. Work will be retried locally.
# this error shows up in the Job log
Error message from worker: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Map<String, Object> props = new HashMap<>();
// azure event hub authentication
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL")
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<removed>\";");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 15000);
props.put("max.poll.interval.ms", 30000);
props.put("offset.metadata.max.bytes", 1024);
props.put("connections.max.idle.ms", 180000);
props.put("metadata.max.age.ms", 180000);
PCollectionTuple convertedTableRows =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withConsumerConfigUpdates(ImmutableMap.of(props))
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withValueDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withoutMetadata())
/*
* Step #2: Transform the Kafka Messages into TableRows
*/
.apply("ConvertMessageToTableRow", new MessageToTableRow(options));
This application has a complex build process that was ported over from a GCP Data Flow templates. The build process brings over GCP Dataflow docker image construction and deployment scripts that are brought in as dependencies. Simply clone the repo to get started.
First step is to set up the environment variables to configure the build and deployment scripts for the given application.
export PROJECT=test-project
export IMAGE_NAME=test-project
export BUCKET_NAME=gs://test-project
export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base
export BASE_CONTAINER_IMAGE_VERSION=latest
export TEMPLATE_MODULE=kafka-to-bigquery
export APP_ROOT=/template/${TEMPLATE_MODULE}
export COMMAND_SPEC=${APP_ROOT}/resources/${TEMPLATE_MODULE}-command-spec.json
export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
export BOOTSTRAP=<event_grid_name>.servicebus.windows.net:9093
export TOPICS=<event_grid_topic_name>
export OUTPUT_TABLE=test-project:<schema>.test
export AUTHENTICATION_STRING="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\$ConnectionString\" password=\"<EVENT_GRID_TOPIC_APP_SECRET>\";"
Before building, you will need to update ./kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java file with the additional content to handle the authentication string:
public class KafkaToBigQuery {
public interface Options extends PipelineOptions {
@Description("Kafka Authentication String")
@Required
String getAuthenticationString();
void setAuthenticationString(String authenticationString);
}
public static PipelineResult run(Options options) {
Map<String, Object> props = new HashMap<>();
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", options.getAuthenticationString());
// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 15000);
props.put("max.poll.interval.ms", 30000);
props.put("offset.metadata.max.bytes", 1024);
props.put("connections.max.idle.ms", 180000);
props.put("metadata.max.age.ms", 180000);
PCollectionTuple convertedTableRows =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withConsumerConfigUpdates(props)
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withValueDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withoutMetadata())
}
}
Once you have set up the project and the file changed, the next phase is building the docker image to upload to Google's Container Registry. This command will also build the common
files that interact with miscellaneous Google services. If the build is successful, the container will be pushed into Google Container Registry (GCR). From the GCR, you can deploy into Google Dataflow.
mvn clean package -Dimage=${TARGET_GCR_IMAGE} \
-Dbase-container-image=${BASE_CONTAINER_IMAGE} \
-Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
-Dapp-root=${APP_ROOT} \
-Dcommand-spec=${COMMAND_SPEC} \
-am -pl ${TEMPLATE_MODULE}
Prior to launching the project in Dataflow, the Dataflow runner needs a Flex Template to know how to execute the project. The Flex Template is a JSON metadata file that contains parameters and instructions to construct the GCP Dataflow application. A Flex Template must be uploaded to Google Cloud Storage (GCS) to the corresponding bucket name set up by the environment variables. This step must match this environment variable TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
.
{
"image": "gcr.io/<my-project-url>:latest",
"metadata": {
"name": "Streaming data generator",
"description": "Generates Synthetic data as per user specified schema at a fixed QPS and writes to Sink of user choice.",
"parameters": [
{
"name": "authenticationString",
"label": "Kafka Event Hub Authentication String",
"helpText": "The authentication string for the Azure Event Hub",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "TEXT"
},
{
"name": "bootstrapServers",
"label": "Kafka Broker IP",
"helpText": "The Kafka broker IP",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "TEXT"
},
{
"name": "inputTopics",
"label": "PubSub Topic name",
"helpText": "The name of the topic to which the pipeline should publish data. For example, projects/<project-id>/topics/<topic-name> - should match the Event Grid Topic",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "PUBSUB_TOPIC"
},
{
"name": "outputTableSpec",
"label": "Output BigQuery table",
"helpText": "Output BigQuery table. For example, <project>:<dataset>.<table_name>. Mandatory when sinkType is BIGQUERY.",
"isOptional": false,
"regexes": [
".+:.+\\..+"
],
"paramType": "TEXT"
},
{
"name": "outputDeadletterTable",
"label": "Output Deadletter table",
"helpText": "Output Deadletter table. For example, <project>:<dataset>.<table_name>",
"isOptional": true,
"regexes": [
".+:.+\\..+"
],
"paramType": "TEXT"
}
]
},
"sdk_info": {
"language": "JAVA"
}
}
Once you have an image uploaded to GCP and have uploaded a Flex Template, you can launch the Dataflow application. The parameters must match the parameters included in the Flex Template's metadata section.
export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
gcloud beta dataflow flex-template run ${JOB_NAME} \
--project=${PROJECT} --region=us-east1 \
--template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
--parameters ^~^outputTableSpec=${OUTPUT_TABLE}~inputTopics=${TOPICS}~bootstrapServers=${BOOTSTRAP}~authenticationString="${AUTHENTICATION_STRING}" \
--verbosity=info \
--service-account-email=<service_account_to_execute_service>
Once you run this command, check in the GCP Cloud Console to view the status. The Dataflow Job should be working successfully at this point pulling messages from the Azure Event Grid and inserting them into Google Big Query.
The GCP repo assumes Google Big Query/Dataflow will dynamically make the tables to have the correct rows, but YMMV as I found this finky. The work around is to create the schema in Google Big Query in advance of running the Dataflow job.