Process
In BigQuery, Google Analytics data is linked to a BigQuery project. Each site corresponds with a dataset, and each day an events_*
table is added to that dataset. My query dynamically iterates over all datasets and tables, performs a UNION
of them all updating when new event_*
tables are added, daily, to a destination table.
Issue
One of the fields in the events_*
tables changed a few days ago. The field: collected_traffic_source
had 4 new columns added to its STRUCT
. Older events_*
tables don't have the extra fields which causes this error
Query error: Query column 27 has type STRUCT<manual_campaign_id STRING, manual_campaign_name STRING, manual_source STRING, ...> which cannot be inserted into column collected_traffic_source, which has type STRUCT<manual_campaign_id STRING, manual_campaign_name STRING, manual_source STRING, ...> at [3:3]
I recreated the unioned table with the additional columns, but get the following error due to older tables not having them when I rerun the query.
Query error: Inserted row has wrong column count; Has 28, expected 32 at [3:3]
Question
Is it possible to update the query so it handles based on the number of columns for the collected_traffic_source
STRUCT
and INSERT
based on that?
Query
DECLARE dataset_name STRING;
DECLARE dataset_counter INT64;
-- Get list of datasets matching the pattern 'analytics_xxx'
CREATE TEMP TABLE datasets AS (
SELECT ROW_NUMBER() OVER() rownumber, schema_name AS dataset_name
FROM google-analytics-prod.INFORMATION_SCHEMA.SCHEMATA
WHERE schema_name LIKE 'analytics_%'
);
SET dataset_counter = 1;
-- Iterate through datasets
WHILE dataset_counter <= (SELECT MAX(rownumber) FROM datasets) DO
-- Fetch the next dataset name
SET dataset_name = (SELECT dataset_name FROM datasets WHERE rownumber = dataset_counter);
--Error occurs here
EXECUTE IMMEDIATE FORMAT('''
INSERT INTO bi.production.google_analytics_brandsites
SELECT "%s" AS dataset_brandsite, _TABLE_SUFFIX AS events_tbl_name, PARSE_DATE('%%Y%%m%%d', t.event_date) event_date_formatted,t.event_date, t.event_timestamp, t.event_name, t.event_params, t.event_previous_timestamp, t.event_value_in_usd, t.event_bundle_sequence_id, t.event_server_timestamp_offset, t.user_id, t.user_pseudo_id, t.privacy_info, t.user_properties, t.user_first_touch_timestamp, t.user_ltv, t.device, t.geo, t.app_info, t.traffic_source, t.stream_id, t.platform, t.event_dimensions, t.ecommerce, t.items, t.collected_traffic_source, t.is_active_user FROM google-analytics-prod.%s.events_* AS t
WHERE NOT EXISTS(
SELECT 1
FROM bi.production.google_analytics_brandsites AS d
WHERE t._TABLE_SUFFIX = d.events_tbl_name
AND
"%s" = d.dataset_brandsite
)
''', dataset_name, dataset_name, dataset_name)
;
SET dataset_counter = (dataset_counter + 1);
END WHILE;
Using INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
, allowed handling of the STRUCT
dynamically, utilizing conditionals. This changed the query from dynamically iterating over just the datasets and tables within a project, to also dynamically iterating over each table and its field paths. Finally, determining the number of columns in a field path for the STRUCT
and directing to a conditional.
DECLARE dataset_name STRING;
DECLARE dataset_counter INT64;
DECLARE table_name_ STRING;
DECLARE table_counter INT64;
DECLARE collected_traffic_source_exists INT64;
-- Get list of datasets matching the pattern 'analytics_xxx'
CREATE TEMP TABLE datasets AS (
SELECT ROW_NUMBER() OVER() rownumber, schema_name AS dataset_name
FROM google-analytics-prod.INFORMATION_SCHEMA.SCHEMATA
WHERE schema_name LIKE 'analytics_%'
);
SET dataset_counter = 1;
-- Iterate through datasets
WHILE dataset_counter <= (SELECT MAX(rownumber) FROM datasets) DO
-- Fetch the next dataset name
SET dataset_name = (SELECT dataset_name FROM datasets WHERE rownumber = dataset_counter);
-- Dynamically iterate through a datasets tables
EXECUTE IMMEDIATE FORMAT("""
CREATE TEMP TABLE tables_brandsite_31 AS (
SELECT ROW_NUMBER() OVER() rownumber, table_name
FROM `google-analytics-prod.%s`.INFORMATION_SCHEMA.TABLES
WHERE table_name LIKE 'events_%%'
AND
table_schema = '%s')
""", dataset_name, dataset_name)
;
SET table_counter = 1;
WHILE table_counter <= (SELECT MAX(rownumber) from tables_brandsite_31) DO
SET table_name_ = (SELECT table_name FROM tables_brandsite_31 WHERE rownumber = table_counter);
--Determine number of fields within a STRUCT
EXECUTE IMMEDIATE FORMAT('''
CREATE TEMP TABLE tables_brandsite_31_fields AS (
SELECT COUNT(DISTINCT field_path) fieldpath FROM `google-analytics-prod.%s`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
WHERE table_schema = "%s"
and
column_name = 'collected_traffic_source'
AND
table_name = "%s")
''', dataset_name, dataset_name, table_name_)
;
SET collected_traffic_source_exists = (SELECT fieldpath from tables_brandsite_31_fields
)
;
-- Conditional for if the expected fields exist
IF collected_traffic_source_exists = 13 THEN
EXECUTE IMMEDIATE FORMAT('''
INSERT INTO `analytics.production.google_analytics`
SELECT "%s" AS dataset_brandsite, "%s" AS events_tbl_name, PARSE_DATE('%%Y%%m%%d', t.event_date) event_date_formatted,t.event_date, t.event_timestamp, t.event_name, t.event_params, t.event_previous_timestamp, t.event_value_in_usd, t.event_bundle_sequence_id, t.event_server_timestamp_offset, t.user_id, t.user_pseudo_id, t.privacy_info, t.user_properties, t.user_first_touch_timestamp, t.user_ltv, t.device, t.geo, t.app_info, t.traffic_source, t.stream_id, t.platform, t.event_dimensions, t.ecommerce, t.items, t.collected_traffic_source, t.is_active_user, t.is_active_user, t.batch_event_index, t.batch_page_id, t.batch_ordering_id FROM `google-analytics-prod.%s.%s` AS t
WHERE NOT EXISTS(
SELECT 1
FROM `analytics.production.google_analytics` AS d
WHERE d.events_tbl_name = "%s"
AND
"%s" = d.dataset_brandsite
)
''', dataset_name, table_name_, dataset_name, table_name_,table_name_,dataset_name)
;ELSE
--handle the older tables without the field paths of structs by filling with NULL
EXECUTE IMMEDIATE FORMAT('''
INSERT INTO `analytics.production.google_analytics`
SELECT "%s" AS dataset_brandsite, "%s" AS events_tbl_name, PARSE_DATE('%%Y%%m%%d', t.event_date) event_date_formatted,t.event_date, t.event_timestamp, t.event_name, t.event_params, t.event_previous_timestamp, t.event_value_in_usd, t.event_bundle_sequence_id, t.event_server_timestamp_offset, t.user_id, t.user_pseudo_id, t.privacy_info, t.user_properties, t.user_first_touch_timestamp, t.user_ltv, t.device, t.geo, t.app_info, t.traffic_source, t.stream_id, t.platform, t.event_dimensions, t.ecommerce, t.items,
STRUCT(t.collected_traffic_source.manual_campaign_id,
t.collected_traffic_source.manual_campaign_name,
t.collected_traffic_source.manual_source,
t.collected_traffic_source.manual_content,
NULL AS manual_source_platform,
NULL AS manual_creative_format,
NULL AS manual_marketing_tactic,
t.collected_traffic_source.gclid,
t.collected_traffic_source.dclid,
t.collected_traffic_source.srsltid) AS collected_traffic_source,
t.is_active_user,
FROM `google-analytics-prod.%s.%s` AS t
WHERE NOT EXISTS(
SELECT 1
FROM `analytics.production.google_analytics` AS d
WHERE d.events_tbl_name = "%s"
AND
d.dataset_brandsite = "%s"
)
''', dataset_name, table_name_, dataset_name, table_name_, table_name_,dataset_name)
;
END IF;
DROP TABLE IF EXISTS tables_brandsite_31;
DROP TABLE IF EXISTS tables_brandsite_31_fields;
SET table_counter = (table_counter +1);
END WHILE;
SET dataset_counter = (dataset_counter + 1);
END WHILE;