I have a DAG that transfers files from an FTP server to an Azure Blob. I would like to resolve the connections based on parameters when the DAG is triggered. For example, let's say there are two different FTP servers and there is a connection for each already configured. I want to allow the user triggering the DAG run to specify which connection to use when they trigger it.
with DAG(
"example_ftp_to_blob",
default_args=default_args,
schedule=None,
catchup=False,
) as dag:
ftp_get = FTPFileTransmitOperator(
task_id="ftp_get",
ftp_conn_id="my-ftp-con",
local_filepath="/tmp/my-file",
remote_filepath="/my-file",
operation=FTPOperation.GET,
create_intermediate_dirs=True,
)
blob_put = LocalFilesystemToWasbOperator(
task_id="blob_put",
wasb_conn_id="my_blob_conn",
file_path="/tmp/my-file",
container_name="incoming",
blob_name="my-file",
)
ftp_get >> blob_put
Since the params aren't known when the DAG is parsed, I'm not clear on how (or even if) I can pass those values to the operators.
You normally use templates to evaluate parameters at runtime.
ftp_conn_id
field of FTPFileTransmitOperator
is not templated. Fortunately it's easy to extend it - the docs have actually an example explaining just this: how to add a templated field to existing operator:
class BetterFTPFileTransmitOperator(FTPFileTransmitOperator):
template_fields: Sequence[str] = (*FTPFileTransmitOperator.template_fields, "ftp_conn_id")
Now we are ready to parametrize the connection:
with DAG(
"example_ftp_to_blob",
default_args=default_args,
schedule=None,
catchup=False,
params={ "ftp_conn_id": Param("default_ftp_conn", type="string") }
) as dag:
ftp_get = BetterFTPFileTransmitOperator(
task_id="ftp_get",
ftp_conn_id="{{ params.ftp_conn_id }}",
local_filepath="/tmp/my-file",
remote_filepath="/my-file",
operation=FTPOperation.GET,
create_intermediate_dirs=True,
)
...