I need to append some new data into an existing table on snowflake. I am using sqlalchemy
as the engine along with pandas data frame to_sql()
. Here is the imports and the script:
import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
import snowflake.sqlalchemy
from snowflake.connector.pandas_tools import pd_writer
from sqlalchemy.ext.declarative import declarative_base
registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')
columns_df = pd.DataFrame(data.columns.to_list(), columns={'survey_column_name'})
columns_df['survey_id'] = nextval
columns_df = columns_df[['survey_id', 'survey_column_name']]
columns_df.to_sql('SURVEY_METADATA_COLUMN_NAMES',
index = False,
index_label = None,
con = engine,
schema = 'PUBLIC',
if_exists = 'append',
chunksize = 300,
method = pd_writer)
The error I am getting is as follows:
ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090105 (22000): Cannot perform CREATE
TABLE. This session does not have a current database. Call 'USE DATABASE', or use a qualified name. [SQL:
CREATE TABLE "PUBLIC"."SURVEY_METADATA_COLUMN_NAMES" (
survey_id INTEGER,
survey_column_name TEXT )
]
The connections are as follows:
user = input('Your Snowflake username: ')
password = getpass.getpass('Your Snowflake Password: ')
account = 'MY_ACCOUNT'
conn = snowCtx.connect(
user=user,
password=password,
account=account,
database='MY_DB',
schema='PUBLIC',
warehouse='COMPUTE_WH',
role='SYSADMIN'
)
engine = create_engine(
'snowflake://{user}:{password}@{account}/'.format(
user=user,
password=password,
account=account,
database='MY_DB',
schema = 'PUBLIC',
warehouse='COMPUTE_WH',
role='SYSADMIN',
cache_column_metadata=True
)
)
I switched into using write_pandas()
instead:
success, nchunks, nrows, _ = write_pandas(conn,
columns_df,
'SURVEY_METADATA_COLUMN_NAMES',
chunk_size = 300,
schema = 'PUBLIC')
print(success, nchunks, nrows)
if(success):
print(filename+' columns uploaded')
else:
print(filename+' columns were not uploaded')
Which needs pyarrow
library, so I installed it using:
pip install pyarrow
I removed all imports related to sqlalchemy
and kept the following:
import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from snowflake.connector.pandas_tools import write_pandas