databricksazure-databricksdatabricks-sqltpc

Databricks SQL: Type Inference Challenges Using `COPY INTO`


I'm currently working on executing TPC-DS Benchmark queries in a Databricks SQL warehouse. The process involves generating data locally in CSV files, creating a Databricks database, and Delta tables using the TPC schema. To populate these tables, I've opted to upload CSV files to DBFS and use the COPY INTO command to transfer the data into Delta tables.

Challenge:

My primary obstacle lies in type inference during this data loading process. Without setting the "inferSchema" parameter to true, Databricks reads all columns as strings, leading to errors due to incompatible types. But even with "inferSchema" and "mergeSchema" set to true, I still encounter issues. For example, a column defined as an integer in the schema becomes problematic when the corresponding CSV column is empty, causing Databricks to interpret it as a string.

Schema sample:

create table call_center
(
cc_call_center_sk         integer               not null,
cc_call_center_id         char(16)              not null,
cc_rec_start_date         date                          ,
cc_rec_end_date           date                          ,
cc_closed_date_sk         integer                       ,
cc_open_date_sk           integer                       ,
cc_name                   varchar(50)                   ,
...          
);

CSV sample:

cc_call_center_sk|cc_call_center_id|cc_rec_start_date|cc_rec_end_date|cc_closed_date_sk|cc_open_date_sk|cc_name|cc_class|cc_employees|cc_sq_ft|cc_hours|cc_manager|cc_mkt_id|cc_mkt_class|cc_mkt_desc|cc_market_manager|cc_division|cc_division_name|cc_company|cc_company_name|cc_street_number|cc_street_name|cc_street_type|cc_suite_number|cc_city|cc_county|cc_state|cc_zip|cc_country|cc_gmt_offset|cc_tax_percentage
1|AAAAAAAABAAAAAAA|1998-01-01|||2450952|NY Metro|large|2|1138|8AM-4PM|Bob Belcher|6|More than other authori|Shared others could not count fully dollars. New members ca|Julius Tran|3|pri|6|cally|730|Ash Hill|Boulevard|Suite 0|Midway|Williamson County|TN|31904|United States|-5|0.11|
2|AAAAAAAACAAAAAAA|1998-01-01|2000-12-31||2450806|Mid Atlantic|medium|6|2268|8AM-8AM|Felipe Perkins|2|A bit narrow forms matter animals. Consist|Largely blank years put substantially deaf, new others. Question|Julius Durham|5|anti|1|ought|984|Center Hill|Way|Suite 70|Midway|Williamson County|TN|31904|United States|-5|0.12|
3|AAAAAAAACAAAAAAA|2001-01-01|||2450806|Mid Atlantic|medium|6|4134|8AM-4PM|Mark Hightower|2|Wrong troops shall work sometimes in a opti|Largely blank years put substantially deaf, new others. Question|Julius Durham|1|ought|2|able|984|Center Hill|Way|Suite 70|Midway|Williamson County|TN|31904|United States|-5|0.01|

I am trying to use the COPY INTO command in the following way:

COPY INTO tpc_ds.call_center
FROM '/FileStore/tables/call_center.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ('sep' = '|',
'inferSchema' = 'true',
'mergeSchema' = 'true',
'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

An example of the error that it shows is as the following (the majority of the tables fail):

databricks.sql.exc.ServerOperationError: Failed to merge fields 'cc_closed_date_sk' and 'cc_closed_date_sk'. Failed to merge incompatible data types IntegerType and StringType.

Question

What modifications or extra steps have I got to do to load all the data into Delta tables? Are there specific best practices or optimizations for using the COPY INTO command in Databricks SQL for improved compatibility?


Solution

  • The COPY INTO command supports nested subqueries that could be used to perform explicit casting, and it could be even more perfomant than relying on the inferSchema because it reads the same data twice.

    See COPY INTO examples page, specifically this example that uses ::<type> for casting of data:

    COPY INTO delta.`s3://bucket/deltaTables/target`
      FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
            FROM 's3://bucket/base/path')
      FILEFORMAT = CSV
      PATTERN = 'folder1/file_[a-g].csv'
    

    If you need to cast only specific columns, then you can do cast them in combination with * except(casted columns) expression, like this:

    COPY INTO delta.`s3://bucket/deltaTables/target`
      FROM (SELECT col1::bigint key, col2::int index, * except(col1, col2)
            FROM 's3://bucket/base/path')
      FILEFORMAT = CSV