pythonsql-serverpandasdaskparquet

Extracting SQL Server table data to parquet file


I'm trying to extract one of the SQL Server table data to parquet file format using sqlalchemy, pandas and fastparquet modules, but end up with an exception. Appreciate some help on this, I'm trying this one on a simple table with one column of non null integer type.

Code:

import sqlalchemy as sa
import pandas as pd
import urllib as ul
import fastparquet as fp

def main():
    sqlInstance = 'sqlInstance'
    database = 'database'
    tableName = 'Numbers'
    props = ul.parse.quote_plus("DRIVER={SQL Server Native Client 11.0};"
                                    "SERVER=" + sqlInstance + ";"
                                    "DATABASE=" + database + ";"
                                    "Trusted_Connection=yes;")
    con = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(props))
    fetch_batch_size = 1000
    metadata = sa.schema.MetaData(bind=con)
    table = sa.Table(tableName, metadata, autoload=True)

    # Generate pandas/python compatible datatype mapping
    map = {}
    data_type_map_lookup = {
        'int64': ['smallint', 'tinyint', 'integer'],
        'float': ['bigint', 'float', 'real'],
        'str': ['char', 'nchar', 'nvarchar', 'nvarchar(max)', 'uniqueidentifier', 'varchar(n)', 'varchar(max)'],
        'datetime64[ns]': ['date', 'datetime', 'smalldatetime'],
        'bytes': ['binary', 'varbinary', 'varbinary(max)'],
        'bool': ['bit']
    }
    for col in table.columns:
        for key, value in data_type_map_lookup.items():
            types = data_type_map_lookup[key]
            if list(filter(str(col.type).lower().startswith, types)):
                if col.nullable and key == 'int64':
                    map[col.name] = 'float'
                else:
                    map[col.name] = key

    #Fetch data
    output = table.select().execution_options(stream_results=True).execute()
    append = False
    while True:
        batch = output.fetchmany(fetch_batch_size)
        if not len(batch) > 0:
            break
        else:
            df = (pd.DataFrame(data=batch, columns=map)).astype(dtype=map)
            print(df.to_string())  # Prints good
            fp.write("C:\\temp\\test.parquet", df, write_index=False, compression=False, append=append)
        append = True


if __name__ == "__main__":
    main()

Exception:

TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
  File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
    return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
  File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
    return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name

Solution

  • I experienced the same issue and found a workaround.

    Here is a minimal reproducible code. (using python==3.10.7, pandas==1.5.3, fastparquet==2023.4.0, SQLAlchemy==1.4.47)

    import tempfile
    import numpy as np
    import pandas as pd
    import fastparquet
    from sqlalchemy.sql.expression import quoted_name
    
    with tempfile.TemporaryDirectory() as tempdir:
    
        # Test data
        df = pd.DataFrame({"col_i": np.arange(10), "col_f": np.random.random(10)})
    
        outpath = f"{tempdir}/test_data.parquet"
    
        # Base case: fastparquet IO works no problem with the test data
        fastparquet.write(outpath, df)
        df_out = fastparquet.ParquetFile(outpath).to_pandas()
        assert df.equals(df_out)
    
        # Write the data to SQL, and then load back
        con, table_name = f"sqlite:///{tempdir}/test_data.sqlite3", "table123"
        df.to_sql(table_name, con, index=False)
        df_sql = pd.read_sql_table(table_name, con)
        # ...and it still looks to be fine
        assert df.equals(df_sql)
    
        # However, column names are not pure `str`.
        # They are actually instances of :class:`quoted_name`, which is a
        # subclass of `str`.
        for column_name in df_sql.columns:
            assert isinstance(column_name, quoted_name)
    
        # This will cause failure while writing to the file.
        # The exception will be caught and output to stderr, but will be ignored.
        fastparquet.write(outpath, df_sql)
    
        # This line will raise an error since the Parquet file is broken.
        df_out = fastparquet.ParquetFile(outpath).to_pandas()
    

    This looked to be due to that the DataFrame column names are not pure str when it is read through SQLAlchemy.

    Here is the workaround that worked for me.

    # Make sure that the column names are plain str
    df.columns = [str(name) for name in df.columns]
    
    # Now, fastparquet can write the DataFrame correctly
    fastparquet.write(outpath, df)