Is there a way to change the task_runner
within a prefect deployment? I would like to have possibility to have for a single flow a deployment with say ConcurrentTaskRunner
and DaskTaskRunner
(local or remote).
The only way I have found so far is to create within deployment:
infra_overrides:
env:
dask_server: True
And on the flow level something like:
def determine_runner():
return DaskTaskRunner if os.environ.get("dask_server") == "True" else ConcurrentTaskRunner
@flow(task_runner=determine_runner())
def my_flow():
pass
This works as in normal run I don't have variable dask_server
and in special deployment run where I set this variable agent starts each run on clean environment with this variable set up. But my guess is that there must be a better way. If there was a solution on deployment level I could have a single function building from flows instead of adding to each flow a function determine_runner
.
Of course it would be best if there was possibility to do something like:
Deployment.build_from_flow(
...
task_runner=my_preferred_runner,
)
Which is not implemented.
You can add environment variables that determine which task runner gets used. This GitHub issue has a detailed explanation but here is a TL;DR:
@flow(
task_runner=DaskTaskRunner()
if os.environ.get("MY_ENV") == "prod"
else ConcurrentTaskRunner(),
)
def my_flow():