I am new to BigQuery in general and I'm attempting to insert multiple rows into a bigquery table while also modifying another table in a single transaction.
I'm using the BigQuery java client in kotlin, but I'm having trouble setting the session id for the insertAll
-request.
I'm starting this by creating a transaction and creating a new session id
val queryConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;")
.setCreateSession(true)
.build()
// runQuery just creates a job and waits for execution to end
val sessionId = runQuery(queryConfig).getStatistics<JobStatistics>().sessionInfo.sessionId
I am then setting the session id for the first request, updating the other table
val queryConfig2 = QueryJobConfiguration.newBuilder("SOME_SQL_STATEMENT")
.setCreateSession(false)
.setConnectionProperties(listOf(ConnectionProperty.of("session_id", sessionId)))
.setNamedParameters(values)
.setUseLegacySql(false)
.build()
runQuery(queryConfig2)
At last I am sending the InsertAllRequest, but I am unsure how to set the session id here?
bigquery.insertAll(
InsertAllRequest.newBuilder(tableId)
.setRows(rowContents)
.build()
)
All of this is currently wrapped within this little routine:
suspend fun runRoutine(source: String, : String, name: String, records: List<RowToInsert>)
val context = "context"
val id = "id"
val latestRefreshAt = "latestRefreshAt"
transaction { configureSession ->
query(
"""
UPDATE `${googleTableId1.toSqlTableName()}`
SET $latestRefreshAt = CURRENT_TIMESTAMP()
WHERE $context = @$context AND $id = @$id
""",
mapOf(
context to QueryParameterValue.string(source),
id to QueryParameterValue.string(name),
),
configureSession
)
tableInsertRows(
tableId = googleTableId2,
rowContents = records,
)
}
}
suspend fun transaction(block: suspend BigQuery.(QueryJobConfiguration.Builder.() -> Unit) -> Unit) {
val sessionId = beginTransaction()
val prepareQuery: QueryJobConfiguration.Builder.() -> Unit = {
setCreateSession(false)
setConnectionProperties(
listOf(ConnectionProperty.of("session_id", sessionId))
)
}
try {
bigquery.block(prepareQuery)
commitTransaction(prepareQuery)
} catch (throwable: Throwable) {
rollbackTransaction(prepareQuery)
closeSession(prepareQuery)
throw throwable
}
}
private suspend fun beginTransaction(): String {
val queryConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;")
.setCreateSession(true)
.build()
return runQuery(queryConfig).getStatistics<JobStatistics>().sessionInfo.sessionId
}
private suspend fun commitTransaction(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
val queryConfig = QueryJobConfiguration.newBuilder("COMMIT TRANSACTION;")
.apply(prepareQuery)
.build()
runQuery(queryConfig)
}
private suspend fun rollbackTransaction(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
val queryConfig = QueryJobConfiguration.newBuilder("ROLLBACK TRANSACTION;")
.apply(prepareQuery)
.build()
runQuery(queryConfig)
}
private suspend fun closeSession(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
val queryConfig = QueryJobConfiguration.newBuilder("CALL BQ.ABORT_SESSION();")
.apply(prepareQuery)
.build()
runQuery(queryConfig)
}
private suspend fun runQuery(queryJobConfiguration: QueryJobConfiguration): Job {
val jobId = JobId.newBuilder().setProject(projectId.value).build()
val queryJob =
bigquery.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build())
.waitFor()
if (queryJob == null) {
throw RuntimeException("Job no longer exists")
} else if (queryJob.getStatus().getError() != null) {
throw RuntimeException(queryJob.getStatus().getError().toString())
}
return queryJob
}
suspend fun query(
sql: String,
values: Map<String, QueryParameterValue>,
configure: QueryJobConfiguration.Builder.() -> Unit = {},
): TableResult {
val queryConfig = QueryJobConfiguration.newBuilder(sql)
.apply(configure)
.setNamedParameters(values)
.setUseLegacySql(false)
.build()
return runQuery(queryConfig).getQueryResults()
}
suspend fun tableInsertRows(
tableId: TableId,
rowContents: Iterable<InsertAllRequest.RowToInsert>,
) {
bigquery.insertAll(
InsertAllRequest.newBuilder(tableId)
.setRows(rowContents)
.build()
)
}
private val bigquery by lazy {
BigQueryOptions.getDefaultInstance().getService()
};
So yeah, I'm wondering how to set a sessionId for an insertAll request?
As far as I know you can’t directly set a sessionID for an insertAll request using the BigQuery Java client. insertAll is designed for streaming inserts and doesn't participate in explicit transactions the same way queries do.
I think a parameterized INSERT statement within your transaction would be a good alternative. This way, you guarantee atomicity of your operations, so either all changes are applied, or none are. This is a better approach for managing multiple changes within a single transaction in BigQuery.