I need to create a solution (using Terraform) that checks the BigQuery table is not empty on a daily basis, and if it's empty, it should publish an alert in alerts Slack channel. Problem to implement it is that scheduled query configuration has only 2 options:
I'm using the following query to check if the table is not populated.
ASSERT NOT EXISTS(SELECT COUNT(*) FROM ${var.project_id}.${local.dataset_name}.${local.table_name} where DATE(UPDATED_AT) < DATE(CURRENT_TIMESTAMP())) AS "Table has no records"
After some investigation of GCP documentation, I have found a solution. So I've found the following flow.
Scheduled queries on a daily basis check the BigQuery table using the above-mentioned query. In case the table is empty, a scheduled query publishes a message to GCP Pub/Sub. GCP Cloud Function written using Python listens to the Pub/Sub cloud event, and if there is a new message in Pub/Sub, it consumes a message and publishes it to the corresponding Slack channel.
What should be done:
Pub/Sub topic using Terraform
resource "google_pubsub_topic" "topic-name" {
name = "pubsub-topic-name"
project = var.project_id
Сreate a Scheduled query using Terraform
resource "google_bigquery_data_transfer_config" "scheduled_query_name {
depends_on = [module.bigquery_dataset]
display_name = "Check if BigQuery table is not populated"
location = var.euw4_region
service_account_name = module.service_account_name.email
data_source_id = "scheduled_query"
schedule = "every day 08:00"
params = {
query = "ASSERT EXISTS(SELECT COUNT(*) FROM `${var.project_id}.${local.dataset_name}.${local.table_name}` where DATE(UPDATED_AT) < DATE(CURRENT_TIMESTAMP())) AS \"Table has no records\""
notification_pubsub_topic = "projects/${var.project_id}/topics/topic_name"
project = var.project_id
Create a Cloud Function using Python
import base64
import functions_framework
import os from slack_sdk
import WebClient
slack_token = os.environ["SLACK_TOKEN"]
client = WebClient(token=slack_token)
def process_pubsub_message(event):
data: Dict = event.data
slack_text = base64.b64decode(data["message"]["data"]).decode('utf-8')
state = extract_state_field(slack_text)
if state == "FAILED":
print("Table has no records. Check catalog sync BigQuery tables")
print("Catalog sync BigQuery tables populated successfully")
def extract_state_field(json_string):
cleaned_json = json_string.replace("\\u003c", "<").replace('\"Table has no records\"', "'Table has no records'")
data = json.loads(cleaned_json)
state = data.get("state")
return state
Create a Deployment for Cloud Function using GitHub Actions
gcloud functions deploy "${{ env.CLOUD_FUNCTION_NAME }}" \
--region "${{ env.REGION }}" \
--entry-point process_pubsub_message \
--gen2 \
--runtime python312 \
--timeout 60s \
--project ${{ vars.GCP_PROJECT_ID }} \
--service-account your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
--build-service-account projects/${{ vars.GCP_PROJECT_ID }}/serviceAccounts/your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
--run-service-account your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
--source ./CloudFunction \
--trigger-topic topic-name \
--set-env-vars SLACK_TOKEN=${{ secrets.SLACK_TOKEN }}
Also not to forget to define SLACK_TOKEN in secrets