I'm evaluating DBT for a possible usecase, and everything seems just fine except for one single situation. This is when the source table has struct fields.
I'm using Spark Thrift Server connector
, underlying data is stored as parquet
in S3
. DBT version is 0.20
This is a piece of the source table create sentence, as you can se there are struct fields in there.
CREATE TABLE `<someSchema>`.`<sourceTable>` (
`properties` STRUCT<`site`: STRING>,
`channel` STRING,
`timestamp` STRING,
`anotherDate` STRING,
`aDate ` STRING)
USING parquet
PARTITIONED BY (aDate)
LOCATION 's3a://<someBucket>'
My model just perform a select over that table with certain where clauses. First time it is run, it works just fine, it creates a table exactly the same as the original with some minor changes, just as expected, even with the struct fields.
Here is a piece of the sink table create table
CREATE TABLE `<someSchema>`.`dbtsink` (
`properties` STRUCT<`site`: STRING>,
`channel` STRING,
`timestamp` STRING,
`anotherDate ` STRING,
`aDate` STRING)
USING parquet
PARTITIONED BY (anotherDate)
My problem comes when I run dbt again with some other values in the where clauses, it should create another partition in sink table. Query compilation is just fine
It raises this error:
Runtime Error in model dbtsink (models/anotherDate/dbtsink.sql)
Database Error
Error running query: org.apache.spark.sql.AnalysisException: cannot resolve '`site`' given input columns: [dbtsink__dbt_tmp.channel, dbtsink__dbt_tmp.anotherDate, dbtsink__dbt_tmp.aDate, dbtsink__dbt_tmp.properties, dbtsink__dbt_tmp.timestamp]; line 4 pos 25;
'InsertIntoStatement 'UnresolvedRelation [someSchema, dbtsink], false, false
+- 'Project [properties#6526, 'site, channel#6527, timestamp#6528, aDate#6541, anotherDate#6540]
+- SubqueryAlias dbtsink__dbt_tmp
+- Project [properties#6526, channel#6527, timestamp#6528, anotherDate#6540, aDate#6541]
+- Filter (((aDate#6541 > 2021060100) AND (aDate#6541 <= 2021070609)) AND (anotherDate#6540 = 2021070609))
+- SubqueryAlias spark_catalog.someSchema.sourceTable
+- Relation[context#6524,traits#6525,properties#6526,channel#6527,timestamp#6528,projectId#6529,integrations#6530,messageId#6531,originalTimestamp#6532,receivedAt#6533,sentAt#6534,userId#6535,anonymousId#6536,type#6537,providerId#6538,version#6539,anotherDate#6540,aDate#6541] parquet
It seems that it's trying to read or write the inner field of the struct as a root field. I tested with other struct fields and it happens the same I just want the struct as such, as in the very first execution. As I said, it only happens in the second execution.
This is my model's query, it is very simple
select
properties,
channel,
timestamp,
anotherDate,
aDate
from {{ source('someSchema', 'sourceTable') }}
where aDate > '{{ var("aDateLowerLimit") }}' and aDate <= '{{ var("aDateUpperLimit") }}'
and anotherDate = '{{ var("anotherDate") }}'
If I change the select to convert properties from struct to json with to_json(properties)
it works just as expected, generating a new partition.
If something wrong with structs in DBT? I am doing something wrong?
I am using incremental materialization and I tested it with append andn insert_overwrite, that didn't seem to be the problem
The problem is related to how DBT is trying to parse the columns in a Spark table using a REGEX. See the parse_columns_from_information function.
You can not use a REGEX for parsing a table schema. That function is using the results provided by this Spark SQL statement: show table extended in someSchema like '*'
. When using that statement you get something like this for your table schema:
Schema: root
|-- properties: struct (nullable = true)
| |-- site: string (nullable = true)
|-- channel: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- anotherDate: string (nullable = true)
|-- aDate: string (nullable = true)
Applying that REGEX to the above string will mess up your columns as you have experienced.
You can work around this problem using parse_describe_extended. This function is using the results provided by this Spark SQL statement: describe extended someSchema.dbtsink
. In order to use
parse_describe_extended
you need to disable the DBT cache (something that could be harmful) For disabling the DBT cache you can use this dbt argument: --bypass-cache
.