I'm trying to run a very simple test DAG to get the hang of the basic functionalities of GCP Cloud Composer, but everytime I trigger the DAG, a nasty error pops out and I can't seem to find any information about how to solve it.
The error is:
2020-03-18 22:20:56,627] {taskinstance.py:1059} ERROR - __init__() got an unexpected keyword argument 'min'@-@{"workflow": "notebook-test", "task-id": "notebook-test", "execution-date": "2020-03-18T22:20:41.232043+00:00"}
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/test.py", line 44, in execute_nb
parameters=params
File "/opt/python3.6/lib/python3.6/site-packages/papermill/execute.py", line 104, in execute_notebook
**engine_kwargs
File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 49, in execute_notebook_with_engine
return self.get_engine(engine_name).execute_notebook(nb, kernel_name, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 341, in execute_notebook
nb_man.notebook_start()
File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 69, in wrapper
return func(self, *args, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 198, in notebook_start
self.save()
File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 69, in wrapper
return func(self, *args, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 139, in save
write_ipynb(self.nb, self.output_path)
File "/opt/python3.6/lib/python3.6/site-packages/papermill/iorw.py", line 397, in write_ipynb
papermill_io.write(nbformat.writes(nb), path)
File "/opt/python3.6/lib/python3.6/site-packages/papermill/iorw.py", line 128, in write
return self.get_handler(path).write(buf, path)
File "/opt/python3.6/lib/python3.6/site-packages/papermill/iorw.py", line 316, in write
multiplier=self.RETRY_MULTIPLIER, min=self.RETRY_DELAY, max=self.RETRY_MAX_DELAY
TypeError: __init__() got an unexpected keyword argument 'min'
and my DAG's code is:
import airflow
import papermill as pm
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'end_date': None,
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
dag_id="notebook-test",
description="a test",
default_args=default_args,
catchup=True,
schedule_interval=None,
dagrun_timeout=(timedelta(seconds=30))
)
NB_PATH = "gs://BUCKET/data/"
params = {}
def execute_nb():
input_nb = NB_PATH + "test.ipynb"
output_nb = NB_PATH + "test_ran.ipynb"
pm.execute_notebook(
input_nb,
output_nb,
parameters=params
)
op = PythonOperator(
task_id="notebook-test",
python_callable=execute_nb,
dag=dag
)
op
One solution I tried already from https://github.com/nteract/papermill/issues/445 was to update the version of Tenacity, but adding that to the PyPi Packages tab of my Cloud Composer environment didn't solve anything.
Any help would be appreciated, thanks!
EDIT: The image version is composer-1.9.2-airflow.1.10.6.
So turns out the issue had to do with the path being supplied.
I had to add import os
as well as from pathlib import Path
and then make my variable
NB_PATH = str(Path(os.path.abspath(__file__)).parents[1]) + "/data"
.
This also required me to add jupyter
as a PyPi dependency for papermill to work correctly, but it seems to be working now!