The goal is to 1) Create a set of ops which run models with given parameters (implemented using op factory), 2) If a model run fails, the next models must run (chaining would couse downstream ops to depend on upstream - therefore implemented workound by setting max concurrency limit to 1), 3) straight after each model has run, clean the output directory using clean_results_dir, 4) only and only after all models have run, even if there have been model failures, then run generate_reports op method.
(3) and (4) is where I am struggling.
What I see in dagster ui when the job runs is a weird behaviour... i see model1 run, model2 run, then cleandir1, cleandir2 .... not necessarily always in this order.. but definitely not what I would like to do.
@graph
def run_all_models_graph():
model_configs = [{"model_name":"my/model/url/","numPaths":10},{"model_name_two":"my/model/url_two/","numPaths":33}]
last_op = None
for config in model_configs:
model_name = config["model_name"]
model_name_formatted = model_name.split("/")[-1]
model_instance = get_from_op_factory(model_name_formatted, model_name, config["numPaths"])
last_op = clean_results_dir(model_instance())
return last_op
@job(
config={
"execution":{
"config": {
"multiprocess": {
"max_concurrent":1
}
}
}
}
)
def run_all_models_job():
result = run_all_models_graph()
generate_reports(start_after=result)
I managed to solve this by... 1) using hooks to run the clean directory method after model runs (successful and failed) and 2) Returning a dummy op from the graph, this dummy op returns a string so it will always succeed. This is used as a way to enforce dependency with the generate report method.