javagoogle-cloud-dataflowapache-beamdataflowapache-beam-io

Unable to create a template


I am trying to create a dataflow template using the below mvn command And i have a json config file in the bucket where i need to read different config file for each run(i dont want to hard code values`) the code is as below

During the template creation im getting below error for all the 3 args which i have passed for getSchemaJson method Value only available at runtime, but accessed from a non-runtime context.

Can some one help me if it is possible to send args at runtime for the getSchemaJson method

-Dexec.mainClass=com.mainClass \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=project \
--stagingLocation=gs://001test_data_flow/staging \
--tempLocation=gs://001test_data_flow/temp \
--templateLocation=gs://001test_data_flow/templates/dataflow.json \
--runner=DataflowRunner \
--region=region"``

public static String getSchemaJson(ValueProvider<String> projectId, ValueProvider<String> bucketName, ValueProvider<String> filename) {
       String fileContent;
       try {
           Storage storage = StorageOptions.newBuilder().setProjectId(String.valueOf(projectId)).build().getService();
           Blob blob = storage.get(String.valueOf(bucketName), String.valueOf(filename));
           fileContent = new String(blob.getContent());
           return fileContent;
       }
       catch (Exception e) {
           LOG.error("Exception Occurred While Processing Schema!!"+e.getMessage());
           return null;
       }
   }

Solution

  • Your code uses classic Dataflow templates, and it has several limitations. The biggest one (and what's also affecting you) is that they can't support dynamic parameters. That means, all the parameters MUST be available at the time of running.

    The reason is, Dataflow generates a compiled template with the DAG and gets uploaded to --templateLocation. This file contains the parameters provided when triggering the job. So, each step of the job (in the DAG) should be pre-determined, so they shouldn't have dynamic values. So, the value for filename should be passed at the time of triggering.

    This is fixed by Flex templates, where the compiled template is generated after the job is triggered! The code is enclosed in a Docker image and the pipeline DAG is generated using a separate process (called the launcher VM). Since the compiled template is generated there, we can pass dynamic parameters to the job and it is evaluated by this launcher VM.

    So, it's advised to use Flex Templates for production level pipelines.

    I wrote an article on getting started with Flex Templates for a Python codebase, but it should work more-or-less the same for Java.