pythonairflowetlsigabrt

Airflow DAG fails when PythonOperator tries to call API and download data


I'm trying to configure Airflow on my laptop for the first time (without using docker, just following documentation). My goal is to set up a simple ETL job.

I've written the simplest possible DAG with one PythonOperator:

from datetime import timedelta
from view import spotify_etl
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),

}
dag = DAG(
    'airflow_dag_tutorial-new',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)


run_etl = PythonOperator(
        task_id='main_task',
        python_callable=spotify_etl,
        dag=dag,
    )

run_etl

When I pass a dummy function with a print statement, the DAG runs successfully. But then, when I pass my function spotify_etl that calls Spotify API, the DAG fails. This is the function:

def spotify_etl():
    token = 'xxx'

    headers = {
    'Accept' : "application/json",
    'Content-Type': "application/json",
    'Authorization': 'Bearer {token}'.format(token=token)    
    }  


    today = datetime.datetime.now()
    yesterday = today - datetime.timedelta(days=100)
    yesterday_unix_timestamp = int(yesterday.timestamp()) *1000


    r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp), headers=headers)
    data = r.json()
    print(data)

The error I get is:

[2020-11-08 12:35:23,453] {local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGABRT

Does anyone know how to use PythonOperator correctly for a function that calls API? What is causing this error?

I tried setting: export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in my venv (as suggested here: Airflow task running tweepy exits with return code -6 and here: https://github.com/ansible/ansible/issues/32499#issuecomment-341578864) but that doesn't seem to have fix it.


Solution

  • It turned out that the "export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES" was not set correctly. It had to be added to .zshrc instead of .bash_profile. That solved it.