I am currently evaluating a proof of concept which uses Google bucket, a java microservice and Dataflow.
The communication flow is like so:
I am starting to think that the Java service is not necessary and I can directly call Dataflow after the CSV is uploaded to the bucket?
This is the service as you can see its just a basic controller that validates the request params from the "Create" trigger and then delegates to the Dataflow service
@PostMapping(value = "/dataflow", produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<Object> triggerDataFlowJob(@RequestBody Map<String, Object> body) {
Map<String, String> requestParams = getRequestParams(body);
log.atInfo().log("Body %s", requestParams);
String bucket = requestParams.get("bucket");
String fileName = requestParams.get("name");
if (Objects.isNull(bucket) || Objects.isNull(fileName)) {
AuditLogger.log(AuditCode.INVALID_CLOUD_STORAGE_REQUEST.getCode(), AuditCode.INVALID_CLOUD_STORAGE_REQUEST.getAuditText());
return ResponseEntity.accepted().build();
}
log.atInfo().log("Triggering a Dataflow job, using Cloud Storage bucket: %s --> and file %s", bucket, fileName);
try {
return DataflowTransport
.newDataflowClient(options)
.build()
.projects()
.locations()
.flexTemplates()
.launch(gcpProjectIdProvider.getProjectId(),
dataflowProperties.getRegion(),
launchFlexTemplateRequest)
.execute();
} catch (Exception ex) {
if (ex instanceof GoogleJsonResponseException && ((GoogleJsonResponseException) ex).getStatusCode() == 409) {
log.atInfo().log("Dataflow job already triggered using Cloud Storage bucket: %s --> and file %s", bucket, fileName);
} else {
log.atSevere().withCause(ex).log("Error while launching dataflow jobs");
AuditLogger.log(AuditCode.LAUNCH_DATAFLOW_JOB.getCode(), AuditCode.LAUNCH_DATAFLOW_JOB.getAuditText());
}
}
return ResponseEntity.accepted().build();
}
Is there a way to directly integrate Google bucket triggers with Dataflow?
When a file is uploaded to Cloud Storage
, you can trigger a Cloud Function V2 with event arc.
Then in this Cloud Function, you can trigger a Dataflow
job.
gcloud functions deploy your_function_name \
--gen2 \
--trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
--trigger-event-filters="bucket=YOUR_STORAGE_BUCKET
Dataflow
job with a code sample that looks like this :def startDataflowProcess(data, context):
from googleapiclient.discovery import build
#replace with your projectID
project = "grounded-pivot-266616"
job = project + " " + str(data['timeCreated'])
#path of the dataflow template on google storage bucket
template = "gs://sample-bucket/sample-template"
inputFile = "gs://" + str(data['bucket']) + "/" + str(data['name'])
#user defined parameters to pass to the dataflow pipeline job
parameters = {
'inputFile': inputFile,
}
#tempLocation is the path on GCS to store temp files generated during the dataflow job
environment = {'tempLocation': 'gs://sample-bucket/temp-location'}
service = build('dataflow', 'v1b3', cache_discovery=False)
#below API is used when we want to pass the location of the dataflow job
request = service.projects().locations().templates().launch(
projectId=project,
gcsPath=template,
location='europe-west1',
body={
'jobName': job,
'parameters': parameters,
'environment':environment
},
)
response = request.execute()
print(str(response))
This Cloud Function shows an example with Python
but you can keep your logic with Java
if you prefer.