I'm trying to extract one of the SQL Server table data to parquet file format using sqlalchemy, pandas and fastparquet modules, but end up with an exception. Appreciate some help on this, I'm trying this one on a simple table with one column of non null integer type.
Code:
import sqlalchemy as sa
import pandas as pd
import urllib as ul
import fastparquet as fp
def main():
sqlInstance = 'sqlInstance'
database = 'database'
tableName = 'Numbers'
props = ul.parse.quote_plus("DRIVER={SQL Server Native Client 11.0};"
"SERVER=" + sqlInstance + ";"
"DATABASE=" + database + ";"
"Trusted_Connection=yes;")
con = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(props))
fetch_batch_size = 1000
metadata = sa.schema.MetaData(bind=con)
table = sa.Table(tableName, metadata, autoload=True)
# Generate pandas/python compatible datatype mapping
map = {}
data_type_map_lookup = {
'int64': ['smallint', 'tinyint', 'integer'],
'float': ['bigint', 'float', 'real'],
'str': ['char', 'nchar', 'nvarchar', 'nvarchar(max)', 'uniqueidentifier', 'varchar(n)', 'varchar(max)'],
'datetime64[ns]': ['date', 'datetime', 'smalldatetime'],
'bytes': ['binary', 'varbinary', 'varbinary(max)'],
'bool': ['bit']
}
for col in table.columns:
for key, value in data_type_map_lookup.items():
types = data_type_map_lookup[key]
if list(filter(str(col.type).lower().startswith, types)):
if col.nullable and key == 'int64':
map[col.name] = 'float'
else:
map[col.name] = key
#Fetch data
output = table.select().execution_options(stream_results=True).execute()
append = False
while True:
batch = output.fetchmany(fetch_batch_size)
if not len(batch) > 0:
break
else:
df = (pd.DataFrame(data=batch, columns=map)).astype(dtype=map)
print(df.to_string()) # Prints good
fp.write("C:\\temp\\test.parquet", df, write_index=False, compression=False, append=append)
append = True
if __name__ == "__main__":
main()
Exception:
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
I experienced the same issue and found a workaround.
Here is a minimal reproducible code. (using python==3.10.7
, pandas==1.5.3
, fastparquet==2023.4.0
, SQLAlchemy==1.4.47
)
import tempfile
import numpy as np
import pandas as pd
import fastparquet
from sqlalchemy.sql.expression import quoted_name
with tempfile.TemporaryDirectory() as tempdir:
# Test data
df = pd.DataFrame({"col_i": np.arange(10), "col_f": np.random.random(10)})
outpath = f"{tempdir}/test_data.parquet"
# Base case: fastparquet IO works no problem with the test data
fastparquet.write(outpath, df)
df_out = fastparquet.ParquetFile(outpath).to_pandas()
assert df.equals(df_out)
# Write the data to SQL, and then load back
con, table_name = f"sqlite:///{tempdir}/test_data.sqlite3", "table123"
df.to_sql(table_name, con, index=False)
df_sql = pd.read_sql_table(table_name, con)
# ...and it still looks to be fine
assert df.equals(df_sql)
# However, column names are not pure `str`.
# They are actually instances of :class:`quoted_name`, which is a
# subclass of `str`.
for column_name in df_sql.columns:
assert isinstance(column_name, quoted_name)
# This will cause failure while writing to the file.
# The exception will be caught and output to stderr, but will be ignored.
fastparquet.write(outpath, df_sql)
# This line will raise an error since the Parquet file is broken.
df_out = fastparquet.ParquetFile(outpath).to_pandas()
This looked to be due to that the DataFrame column names are not pure str when it is read through SQLAlchemy.
Here is the workaround that worked for me.
# Make sure that the column names are plain str
df.columns = [str(name) for name in df.columns]
# Now, fastparquet can write the DataFrame correctly
fastparquet.write(outpath, df)