spring-batchspring-cloud-dataflowspring-cloud-taskspring-cloud-dataflow-ui

Spring Cloud Data Flow Task Persist Arguments Between Executions


I'm experimenting with SCDF and successfully running Spring Batch Jobs as Tasks. But I'm having a issue with Task Arguments persisting. It seems that each time I need to execute the Task I should provide it with the command line arguments.

In my use case I need the command line arguments to be set once and for all for a Task.

enter image description here

Thank you


Solution

  • After some research I ended up using the Parameters instead of the Arguments.

    enter image description here

    1. First I created a Spring Batch with multiple CommandLineRunners (2 in my case), one for the production which will use the "application properties" that will be overridden by SCDF parameters, and one for the other environments (DEV, ...) that will get launched through simple command line arguments or through API.

    2. First CommandLineRunner:

      @Component @Slf4j @Profile("prod") public class ProdJobCommandLineRunner implements CommandLineRunner {

      @Value("${jobname}")
      private String jobName;
      
      @Value("${argument1}")
      private String argument1;
      
      @Value("${argument2}")
      private String argument2;
      
      @Autowired
      private ApplicationContext context;
      
      @Autowired
      private JobLauncher jobLauncher;
      
      @Override
      public void run(String... args) {
      
          log.info("Begin Launching Job with Args {}", Arrays.asList(args));
          log.error("JOB NAME: " + jobName);
      
          if (!CollectionUtils.isEmpty(Arrays.asList(args))) {
      
              JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
      
              jobParametersBuilder.addString("argument1", argument1);
              jobParametersBuilder.addString("argument2", argument2);
      
              try {
                  Job job = (Job) context.getBean(jobName);
      
                  jobLauncher.run(job, jobParametersBuilder.toJobParameters());
      
              } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
                  log.error("Exception ", e);
              }
          }
          log.info("End Launching Job with Args {}", Arrays.asList(args));
      }
      

      }

    3. Second CommandLineRunner:

      @Component @Slf4j @Profile("!prod") public class DefaultJobCommandLineRunner implements CommandLineRunner {

      @Autowired
      private ApplicationContext context;
      
      @Autowired
      private JobLauncher jobLauncher;
      
      @Override
      public void run(String... args) {
      
          log.info("Begin Launching Job with Args {}", Arrays.asList(args));
      
          if (!CollectionUtils.isEmpty(Arrays.asList(args))) {
      
              Map<String, String> params = parseJobArgs(args);
              JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
      
              if (Boolean.parseBoolean(params.getOrDefault("force_restart", "false"))) {
                  jobParametersBuilder.addString("force_restart", LocalDateTime.now().toString());
              }
      
              try {
      
                  String jobName = params.get("job_name");
      
                  log.info("JOB NAME: " + jobName);
      
                  Job job = (Job) context.getBean(jobName);
      
                  jobLauncher.run(job, jobParametersBuilder.toJobParameters());
      
              } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
                  log.error("Exception ", e);
              }
          }
          log.info("End Launching Job with Args {}", Arrays.asList(args));
      }
      
      private Map<String, String> parseJobArgs(String[] args) {
          Map<String, String> params = new HashMap<>();
      
          Arrays.asList(args).forEach(arg -> {
              String key = StringUtils.trimAllWhitespace(arg.split("=")[0]);
              String value = StringUtils.trimAllWhitespace(arg.split("=")[1]);
      
              params.put(key, value);
          });
          return params;
      }
      

      }

    4. Import the app in SCDF, say for example TESTAPP

    enter image description here

    1. Create multiple Tasks, depending on how many use cases you have, using the same imported app

    2. For each task when launched for the first time, set the Parameters you have following the naming convention: app. "APP_NAME". "property key"="property value"

    In this case for example it will be: app.TESTAPP.jobname=JOB_NAME

    enter image description here

    enter image description here

    I hope this helps.