pythonetlpipelinedagster

Running graph ops sequentially without affecting downstream


The goal is to 1) Create a set of ops which run models with given parameters (implemented using op factory), 2) If a model run fails, the next models must run (chaining would couse downstream ops to depend on upstream - therefore implemented workound by setting max concurrency limit to 1), 3) straight after each model has run, clean the output directory using clean_results_dir, 4) only and only after all models have run, even if there have been model failures, then run generate_reports op method.

(3) and (4) is where I am struggling.

What I see in dagster ui when the job runs is a weird behaviour... i see model1 run, model2 run, then cleandir1, cleandir2 .... not necessarily always in this order.. but definitely not what I would like to do.

@graph
def run_all_models_graph():
    model_configs = [{"model_name":"my/model/url/","numPaths":10},{"model_name_two":"my/model/url_two/","numPaths":33}]
    last_op = None
    for config in model_configs:
        model_name = config["model_name"]
        model_name_formatted = model_name.split("/")[-1]
        
        model_instance = get_from_op_factory(model_name_formatted, model_name, config["numPaths"])
        last_op = clean_results_dir(model_instance())
    return last_op




@job(
    config={
        "execution":{
            "config": {
                "multiprocess": {
                    "max_concurrent":1
                    }
                }
            }
        }
)
def run_all_models_job():
    result = run_all_models_graph()
    generate_reports(start_after=result)


enter image description here


Solution

  • I managed to solve this by... 1) using hooks to run the clean directory method after model runs (successful and failed) and 2) Returning a dummy op from the graph, this dummy op returns a string so it will always succeed. This is used as a way to enforce dependency with the generate report method.