javagoogle-cloud-platformgoogle-cloud-dataflowgoogle-bucket

Google Dataflow trigger from Google bucket upload?


I am currently evaluating a proof of concept which uses Google bucket, a java microservice and Dataflow.

The communication flow is like so:

  1. User sends CSV file to third party service
  2. Service uploads CSV file to Google bucket with ID and filename
  3. A create event is triggered and sent as a HTTP request to Java microservice
  4. Java service triggers a Google Dataflow job

I am starting to think that the Java service is not necessary and I can directly call Dataflow after the CSV is uploaded to the bucket?

This is the service as you can see its just a basic controller that validates the request params from the "Create" trigger and then delegates to the Dataflow service

@PostMapping(value = "/dataflow", produces = {MediaType.APPLICATION_JSON_VALUE})
    public ResponseEntity<Object> triggerDataFlowJob(@RequestBody Map<String, Object> body) {
        Map<String, String> requestParams = getRequestParams(body);
        log.atInfo().log("Body %s", requestParams);

        String bucket = requestParams.get("bucket");
        String fileName = requestParams.get("name");

        if (Objects.isNull(bucket) || Objects.isNull(fileName)) {
            AuditLogger.log(AuditCode.INVALID_CLOUD_STORAGE_REQUEST.getCode(), AuditCode.INVALID_CLOUD_STORAGE_REQUEST.getAuditText());
            return ResponseEntity.accepted().build();
        }

        log.atInfo().log("Triggering a Dataflow job, using Cloud Storage bucket: %s --> and file %s", bucket, fileName);
        try {
                return DataflowTransport
                .newDataflowClient(options)
                .build()
                .projects()
                .locations()
                .flexTemplates()
                .launch(gcpProjectIdProvider.getProjectId(),
                        dataflowProperties.getRegion(),
                        launchFlexTemplateRequest)
                .execute();
        } catch (Exception ex) {
            if (ex instanceof GoogleJsonResponseException && ((GoogleJsonResponseException) ex).getStatusCode() == 409) {
                log.atInfo().log("Dataflow job already triggered using Cloud Storage bucket: %s --> and file %s", bucket, fileName);
            } else {
                log.atSevere().withCause(ex).log("Error while launching dataflow jobs");
                AuditLogger.log(AuditCode.LAUNCH_DATAFLOW_JOB.getCode(), AuditCode.LAUNCH_DATAFLOW_JOB.getAuditText());
            }
        }

        return ResponseEntity.accepted().build();
    }

Is there a way to directly integrate Google bucket triggers with Dataflow?


Solution

  • When a file is uploaded to Cloud Storage, you can trigger a Cloud Function V2 with event arc.

    Then in this Cloud Function, you can trigger a Dataflow job.

    gcloud functions deploy your_function_name \
    --gen2 \
    --trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
    --trigger-event-filters="bucket=YOUR_STORAGE_BUCKET
    
    def startDataflowProcess(data, context):
        from googleapiclient.discovery import build
        #replace with your projectID
        project = "grounded-pivot-266616"
        job = project + " " + str(data['timeCreated'])
        #path of the dataflow template on google storage bucket
        template = "gs://sample-bucket/sample-template"
        inputFile = "gs://" + str(data['bucket']) + "/" + str(data['name'])
        #user defined parameters to pass to the dataflow pipeline job
        parameters = {
            'inputFile': inputFile,
        }
        #tempLocation is the path on GCS to store temp files generated during the dataflow job
        environment = {'tempLocation': 'gs://sample-bucket/temp-location'}
    
        service = build('dataflow', 'v1b3', cache_discovery=False)
        #below API is used when we want to pass the location of the dataflow job
        request = service.projects().locations().templates().launch(
            projectId=project,
            gcsPath=template,
            location='europe-west1',
            body={
                'jobName': job,
                'parameters': parameters,
                'environment':environment
            },
        )
        response = request.execute()
        print(str(response))
    

    This Cloud Function shows an example with Python but you can keep your logic with Java if you prefer.