pythonpostgresqlairflow

Not uploading file to postgres with bulk_load. Apache Airflow


I create a file in task, then use bulk_load to load it into the database. The code is executed without errors, but nothing appears in the database, what could be the problem?

Code that I think doesn't work:

path = r'/opt/airflow/dags/asd/data/'
 with open(os.path.join(path+"{}.csv".format(get_dates())), 'w+', encoding='utf-8') as f:
      f.write(resultcsv)               
      fname = f.name
      try:

         hook = PostgresHook(postgres_conn_id='test_db')                   
         hook.bulk_load('testdb', fname)
      finally:           
         os.remove(fname)

airflow log:

{postgres.py:168} INFO - Running copy expert: COPY testdb FROM STDIN, filename: /opt/airflow/dags/asd/data/2024-08-15.csv
{base.py:73} INFO - Using connection ID 'test_db' for task execution.
{python.py:194} INFO - Done. Returned value was: None
{taskinstance.py:1400} INFO - Marking task as SUCCESS.
{local_task_job_runner.py:228} INFO - Task exited with return code 0

I tried saving the file in different formats: csv and tsv, but it didn't work


Solution

  • You're loading the file while it is still open. You should try moving it outside the with statement, like this. Otherwise, the contents may not be actually written to disk for Postgres to load.

    path = r'/opt/airflow/dags/asd/data/'
    with open(os.path.join(path+"{}.csv".format(get_dates())), 'w+', encoding='utf-8') as f:
        f.write(resultcsv)               
        fname = f.name
    
    try:
        hook = PostgresHook(postgres_conn_id='test_db')                   
        hook.bulk_load('testdb', fname)
    finally:           
        os.remove(fname)