amazon-s3airflowmwaa

Why running local file system to s3 worked as test and uploaded, but actually running dag failed in Airflow managed server?


My local workspace has csv file "online_retail.csv" that I try to upload. Goal: upload file from local to s3.

I have been running this dag below as test worked and uploaded right into s3. Without seeing any dag in my managed AWS Airflow. Uploading this dag to s3 bucket and giving it to my managed AWS Apache Airflow server did not work - it says:

File "/usr/lib/python3.10/genericpath.py", line 50, in getsize
    return os.stat(filename).st_size
FileNotFoundError: [Errno 2] No such file or directory: 'online_retail.csv'
[2023-10-17, 14:43:28 +05] {{taskinstance.py:1345}} INFO - Marking task as FAILED. dag_id=kkkk, task_id=create_local_to_s3_job, execution_date=20231017T094312, start_date=20231017T094327, end_date=20231017T094328

Code below:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.transfers.local_to_s3 import (
    LocalFilesystemToS3Operator,
)
from airflow.utils.dates import days_ago


default_args = {
    "owner": "warhammer",
}

# Instantiate a DAG object
mydag = DAG(
    "kkkk",
    default_args=default_args,
    description="upping to s3!",
    start_date=days_ago(2),
    catchup=False,
    tags=["s3 warhammer"],
)


start_task = EmptyOperator(task_id="start_task", dag=mydag)


create_local_to_s3_job = LocalFilesystemToS3Operator(
    task_id="create_local_to_s3_job",
    filename="online_retail.csv",
    dest_key="myk.csv",
    dest_bucket="kaggle-dataset-bucket",
    replace=True,
    dag=mydag,
)


# Creating third task
end_task = EmptyOperator(task_id="end_task", dag=mydag)

# Set the order of execution of tasks.
start_task >> create_local_to_s3_job >> end_task

# when I run this line of code in my machine - it did upload to s3
# but when I comment this below and upload the entire dag.py file to s3 - the entire dag fails in my managed AWS Airflow webserver
if __name__=='__main__':
    mydag.test()

It seems that localfilesystem_to_s3_operator only works when launched from my machine - is that because it has the actual file? When I upload the dag code to webserver - is that because it starts looking for the file in its own local dir and obviously can't find it?

Is it correct - local file system to s3 operator task is not meant to be run from Airflow webserver since it looks for "local file system"?


Solution

  • Yes, you are correct. When the code is running in Managed Workflows for Apache Airflow (MWAA - AWS), it is running in an ECS container, which has its own file system. It cannot find the file available on your local computer, hence the No such file or directory error. It is worth noting it is looking for the file on one of the worker containers, not the webserver, since that is where the tasks run. I recommend reading up more on the MWAA architecture & how it functions here.