pythonpipelineduckdbdagsterdata-engineering

Collecting / joining / waiting for parallel, depth-first ops in Dagster


After a much-appreciated assist from @zyd in this answer to parallel, deep-first execution in Dagster, I am now looking for a way to run an @op on the collected results of the graph run, or at least one that waits until they have all finished, since they don't have hard dependencies per se. My working code is as follows:

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
  for csv_filename in csv_list:
    yield DynamicOutput(csv_filename, mapping_key=csv_filename)

@op # no dep since 1st task
def load_csv_into_duckdb(context, csv_filename)

@op(ins={"start":In(Nothing)}
def transform_dates(context, csv_filename)

@op(ins={"start":In(Nothing)}
def from_code_2_categories(context, csv_filename)

@op(ins={"start":In(Nothing)}
def export_2_parquet(context, csv_filename)

@op(ins={"start":In(Nothing)}
def profile_dataset(context, csv_filename)

@graph
def process(context, csv_filename:str):
  task1 = load_csv_into_duckdb(context=context, csv_filename=csv_filename)
  task2 = transform_dates(start=task1, context=context, csv_filename=csv_filename)
  task3 = from_code_2_categories(start=task2, context=context, csv_filename=csv_filename)
  task4 = export_2_parquet(start=task3, context=context, csv_filename=csv_filename)
  profile_dataset(start=task4, context=context, csv_filename=csv_filename)

  
@job
def pipeline():
  csv_filename_list = get_csv_filenames()
  generate_subtasks(csv_filename_list).map(process)

I have tried the .map(process).collect() approach, but Dagster complains that Nonetype has no attribute collect. However, I've seen several examples online of this same approach and apparently it should work.

I have also tried for the @graph to return a list of the individual task return values, but DagsterUI complains that a graph-decorated function should return a dict with mapping keys. I could build that, but I feel I should instead pick that up from Dagster's execution context, which I don't know how to access from within the graph funct.

Does anyone have some pointers?


Solution

  • Here's an example that worked for me:

    from dagster import Definitions, op, DynamicOutput, graph, GraphOut, DynamicOut
    
    
    @op
    def a_op(path):
        return path
    
    @op
    def op2(path):
        return path
    
    @op
    def op3(path):
        return path
    
    @op(out=DynamicOut(str))
    def mapper():
        for i in range(10):
            yield DynamicOutput(str(i), mapping_key=str(i))
    
    # I think what you were missing is returning the output from the graph here
    @graph(out={"out": GraphOut()})
    def nested(path: str):
        return op2(a_op(path))
    
    @op
    def consumer(context, paths: list[str]):
        context.log.info(paths)
    
    
    @graph
    def the_graph():
        consumer(mapper().map(nested).collect())
    
    the_job = the_graph.to_job()
    
    defs = Definitions(
        jobs=[the_job],
    )
    
    

    A graph is really just an organizational concept for grouping ops. At runtime nested graphs are flattened into a single graph for the whole job. This implies that if you want to use the output of an op in the nested graph, you need to return the output of the op from the nested graph. Then everything else works the same as any other op->op dependency.