My Workflow
Data gets streamed to BigQuery from Pub/Sub using cloud function.
Data stays in Stream buffer for 90 minutes therefor I cannot do an Update statement.
I need to update the Result column before that time.
please help.
I receive data in "Pub/Sub" then a "Cloud functions" is triggered which inserts the data inside "BigQuery"
This is the code:
const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
exports.telemetryToBigQuery = (data, context) => {
if (!data.data) {
throw new Error('No telemetry data was provided!');
return;
}
//Data comes in as base64
console.log(`raw data: ${data.data}`);
//Data gets decoded from base64 to string
const dataDataDecode = Buffer.from(data.data, 'base64').toString();
var indexesSemicolons = [];
for (var i = 0; i < dataDataDecode.length; i++) {
if (dataDataDecode[i] === ";") {
indexesSemicolons.push(i);
}
}
if (indexesSemicolons.length == 14) {
const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);
async function insertRowsAsStream() {
// Inserts the JSON objects into my_dataset:my_table.
const datasetId = 'put your dataset here';
const tableId = 'put table id here';
const rows = [
{
Brand: brand,
Model: model,
Result: result
}
];
// Insert data into a table
await bigquery
.dataset(datasetId)
.table(tableId)
.insert(rows);
console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();
} else {
console.log("Invalid message");
return;
}
}
This data stays in the BigQuery stream buffer for about 90 minutes, But I need to need to execute an update query which changes the Result column. This is not allowed and causes an error
ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError
I need a way to update the Result before the 90 minute buffer time. Can you guys help me please.
I read the following pages online
I read the answer from the following question I think I understand the idea of what he is talking about but I don't know how to execute it.
If I am correct he is saying to stream my data to a temporary table and from there put it into a permanent table.
I am now using the BigQuery Create a job methode.
the example found here
I directly puts the data inside the table so I don't have to wait 90 minutes for the streaming buffer.