what is best way to transfer all records from BigQuery table to Cloud SQL table on daily basis (every day expected approximate count of records more than 255801312 [255 million]). I know we can create dataflow pipelines from BQ to CloudSQL, but this large amount of data will run for hours and hours. Any best solution to implement within google cloud?.
Here a working example of Workflow. You need to give enough permission to your workflow service account (cloudsql admin, bigquery dataviewer + job user, cloud storage admin) and the table must exist in your Cloud SQL instance (I tested with MySQL).
The article is cooking with more detail in it. Replace the bucket, the projectid, the Cloud SQL instance name (mysql in my case), the query, the table name, the database schema
main:
steps:
- assignStep:
assign:
- bucket: "TODO"
- projectid: "TODO"
- prefix: "workflow-import/export"
- listResult:
nextPageToken: ""
- export-query:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${projectid}
body:
query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS SELECT id, email FROM `copy_dataset.name_test`"}
useLegacySql: false
- importfiles:
call: import_files
args:
pagetoken: ${listResult.nextPageToken}
bucket: ${bucket}
prefix: ${prefix}
projectid: ${projectid}
result: listResult
- missing-files:
switch:
- condition: ${"nextPageToken" in listResult}
next: importfiles
import_files:
params:
- pagetoken
- bucket
- prefix
- projectid
steps:
- list-files:
call: googleapis.storage.v1.objects.list
args:
bucket: ${bucket}
pageToken: ${pagetoken}
prefix: ${prefix}
result: listResult
- process-files:
for:
value: file
in: ${listResult.items}
steps:
- wait-import:
call: load_file
args:
projectid: ${projectid}
importrequest:
importContext:
uri: ${"gs://" + bucket + "/" + file.name}
database: "test_schema"
fileType: CSV
csvImportOptions:
table: "workflowimport"
- return-step:
return: ${listResult}
load_file:
params: [importrequest,projectid]
steps:
- callImport:
call: http.post
args:
url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/mysql/import"}
auth:
type: OAuth2
body: ${importrequest}
result: operation
- chekoperation:
switch:
- condition: ${operation.body.status != "DONE"}
next: wait
next: completed
- completed:
return: "done"
- wait:
call: sys.sleep
args:
seconds: 5
next: getoperation
- getoperation:
call: http.get
args:
url: ${operation.body.selfLink}
auth:
type: OAuth2
result: operation
next: chekoperation
More detail in my medium article