pythonpyarrow

Parquet with null columns on Pyarrow


I'm reading a table on PostgreSQL using pandas.read_sql, then I'm converting it as a Pyarrow table and saving it partitioned in local filesystem.

# Retrieve schema.table data from database
def basename_file(date_partition):
    basename_file = f"{table_schema}.{table_name}-{date}.parquet"
    return basename_file

def get_table_data(table_schema, table_name, date):
    s  = ""
    s += "SELECT"
    s += " *"
    s += " , date(created_on) as date_partition"
    s += " FROM {table_schema}.{table_name}"
    s += " WHERE created_on = '{date}';" 
    sql = s.format(table_schema = table_schema, table_name = table_name, date = date)
#     print(sql)

    df = pd.read_sql(sql, db_conn)
    result = pa.Table.from_pandas(df)
    pq.write_to_dataset(result,
                        root_path = f"{dir_name}",
                        partition_cols = ['date_partition'],
                        partition_filename_cb = basename_file,
                        use_legacy_dataset = True
                       )
#     print(result)
    return df

Problem is that my SELECT has a column with some rows as null. When I partition this to write (write_to_dataset) in local filesystem, a few files has only rows with that column as null, so the partitioned Parquet files doesn't have this column.

When I try to read that by multiple partitions, I get a schema error, because one of the columns cannot be casted correctly.

Why is that? Is there any setting I could apply to write_to_dataset to manage this? I've been looking for a workaround for this without success... My main goal here is to export data daily, partitioned by transaction date and read data from any period needed, not caring about schema evolution: that way, row value for null columns will appear as null, simply put.


Solution

  • If you can post the exact error message that might be more helpful. I did some experiments with pyarrow 6.0.1 and I found that things work ok as long as the first file contains some valid values for all columns (pyarrow will use this first file to infer the schema for the entire dataset).

    The "first" file is not technically well defined when doing dataset discovery but, at the moment, for a local dataset it should be the first file in alphabetical order.

    If the first file does not have values for all columns then I get the following error:

    Error: Unsupported cast from string to null using function cast_null

    I'm a bit surprised as this sort of cast should be pretty easy (to cast to null just throw away all the data). That being said, you probably don't want all your data thrown away anyways.

    The easiest solution is to provide the full expected schema when you are creating your dataset. If you do not know this ahead of time you can figure it out yourself by inspecting all of the files in the dataset and using pyarrow's unify_schemas. I have an example of doing this in this answer.

    Here is some code demonstrating my findings:

    import os
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    import pyarrow.dataset as ds
    
    tab = pa.Table.from_pydict({'x': [1, 2, 3], 'y': [None, None, None]})
    tab2 = pa.Table.from_pydict({'x': [4, 5, 6], 'y': ['x', 'y', 'z']})
    
    os.makedirs('/tmp/null_first_dataset', exist_ok=True)
    pq.write_table(tab, '/tmp/null_first_dataset/0.parquet')
    pq.write_table(tab2, '/tmp/null_first_dataset/1.parquet')
    
    os.makedirs('/tmp/null_second_dataset', exist_ok=True)
    pq.write_table(tab, '/tmp/null_second_dataset/1.parquet')
    pq.write_table(tab2, '/tmp/null_second_dataset/0.parquet')
    
    try:
        dataset = ds.dataset('/tmp/null_first_dataset')
        tab = dataset.to_table()
        print(f'Was able to read in null_first_dataset without schema.')
        print(tab)
    except Exception as ex:
        print('Was not able to read in null_first_dataset without schema')
        print(f'  Error: {ex}')
    print()
    
    try:
        dataset = ds.dataset('/tmp/null_second_dataset')
        tab = dataset.to_table()
        print(f'Was able to read in null_second_dataset without schema.')
        print(tab)
    except:
        print('Was not able to read in null_second_dataset without schema')
        print(f'  Error: {ex}')
    print()
    
    dataset = ds.dataset('/tmp/null_first_dataset', schema=tab2.schema)
    tab = dataset.to_table()
    print(f'Was able to read in null_first_dataset by specifying schema.')
    print(tab)