I installed Airflow, both through Apache and Astronomer and wrote a really simple DAG with two tasks, each of which are BashOperators that call a Python script. The first Python script, in turn, reads a file and writes out a file, while the second one reads the file created by the previous Python script, reads another hardcoded file and puts them both in a local DB. Nothing too crazy. But with both versions of Airflow (Apache and Astronomer), I'm doing something wrong that just does not work. The DAG loads, looks okay in Airflow and runs "successfully", but does nothing at all. Any advice would be much appreciated.
Here's my code and what I tried:
The DAG:
import json
from pendulum import datetime
from airflow.operators.bash import BashOperator
from airflow.models.baseoperator import chain
from airflow.decorators import (
dag,
task,
)
PYTHON_SCRIPTS = "python_scripts"
@dag(
schedule="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
default_args={
"retries": 2,
},
)
def parse_json_data():
@task(
templates_exts=[".py"],
)
def parse_json():
BashOperator(
task_id="parse_json_task",
bash_command=f"python {PYTHON_SCRIPTS}/json_file_parser.py",
)
@task(
templates_exts=[".py"],
)
def load_files():
BashOperator(
task_id="load_files_task",
bash_command=f"python {PYTHON_SCRIPTS}/load_data.py",
)
parse_json_task = parse_json()
load_files_task = load_files()
chain(parse_json_task, load_files_task)
parse_json_data()
The two Python scripts:
json_file_parser.py
import gzip
import json
input_json_file = "../data_files/json_file.jsonl.gz"
parsed_json_file = "../data_files/parsed_json_file.json"
# Parse function to extract necessary data from nhtsa file 1
def parse_json_file():
# Create an empty list to store the extracted data
extracted_data = []
# Open the zipped file using and loop through each line
with gzip.open(input_json_file, "rt", encoding="utf-8") as jfile:
# working code here
# Write the extracted data to a new JSON file
with open(parsed_json_file, "w", encoding="utf-8") as output_file:
json.dump(extracted_data, output_file, indent=4)
parse_json_file()
load_data.py
import pandas as pd
from sqlalchemy import create_engine
def load_processed_json_file(engine):
try:
nhtsa_df = pd.read_json("../data_files/parsed_json_file.json")
nhtsa_df.to_sql(
name="processed_json_data", con=engine, if_exists="replace", index=False
)
except FileNotFoundError:
print("Parsed file not found.")
def load_lookup_file(engine):
try:
nhtsa_lookup_df = pd.read_csv("../data_files/data_lookup_file.csv")
nhtsa_lookup_df.to_sql(
name="lookup_data", con=engine, if_exists="replace", index=False
)
except FileNotFoundError:
print("Lookup file not found.")
def main():
db_url = "..."
engine = create_engine(db_url)
load_processed_nhtsa_file(engine)
load_nhtsa_lookup_file(engine)
if __name__ == "__main__":
main()
I tried a myriad of file locations and ways to get this to run, but it didn't work. When using Astronomer, I put the Python scripts in a folder called python_scripts under the include folder and then used a template_searchpath
, after having tried to go the hardcoded route. However, neither approach worked. I put the data files in a folder of its own, also in the include folder, but it didn't work.
For Apache Airflow, I took a look at the example dags and it seemed to indicate that I could just create the Python script and data folders inside the example_dags folder and it would work, but it didn't.
To add to this mess, the DAG "runs successfully", but nothing actually happens. I've used Airflow several times in the past, but this is somehow beyond me. Any help would be great. Thank you!
EDIT: I followed the suggestion below, and it worked. Kinda. It now gives me an error:
can't open file '/private/var/folders/rh/wzc6ln9d67s3n259nvkxq57w0000gn/T/airflowtmpa1sasfg8/python_scripts/json_file_parser.py': [Errno 2] No such file or directory
It seems like it's trying to look for the script, but can't find it. I tried setting the template_searchpath, giving it an absolute path etc., but nothing seems to work with this.
You define the operator wrong.
@task is another way to define PythonOperator, so what you are doing here its to define a PythonOperator that creates a BashOperator but never execute it..
You don't need it, you just need to assign the BashOperator as follow:
parse_json_task = BashOperator(
task_id="parse_json_task",
bash_command=f"python {PYTHON_SCRIPTS}/json_file_parser.py",
)
load_files_task=BashOperator(
task_id="load_files_task",
bash_command=f"python {PYTHON_SCRIPTS}/load_data.py",
)