pythonpipelineduckdbdagsterdata-engineering

Parallel, depth-first ops in Dagster with ops, graphs and jobs together


(also posted on r/dagster)

Dagster N00b here.

I have a very specific use-case. My ETL executes the following steps:

  1. Query a DB to get a list of CSV files
  2. Go to a filesystem and for each CSV file:

The DuckDB tables are named just the same as the CSV files for convenience.

2a through 2e can be done in parallel FOR EACH CSV FILE. Within the context of a single CSV file, they need to run SERIALLY.

My current code is:

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
  for csv_filename in csv_list:
    yield DynamicOutput(csv_filename, mapping_key=csv_filename)

def load_csv_into_duckdb(context, csv_filename)

def transform_dates(context, csv_filename)

def from_code_2_categories(context, csv_filename)

def export_2_parqu

Solution

  • If I understand correctly, you want depth-first processing, instead of breadth first? I think you might be able to trigger depth-first processing using a nested graph after the dynamic output step. You're also conceptually missing how to set dependencies between ops in Dagster. Something like this should work:

    @op
    def get_csv_filenames(context) -> List[str]:
    
    @op(out=DynamicOut())
    def generate_subtasks(context, csv_list:List[str]):
      for csv_filename in csv_list:
        yield DynamicOutput(csv_filename, mapping_key=csv_filename)
    
    @op
    def load_csv_into_duckdb(context, csv_filename)
      ...
      return csv_filename
    
    @op
    def transform_dates(context, csv_filename)
      ...
      return csv_filename
    
    @op
    def from_code_2_categories(context, csv_filename)
      ...
      return csv_filename
    
    @op
    def export_2_parquet(context, csv_filename)
      ...
      return csv_filename
    
    @op
    def profile_dataset(context, csv_filename)
      ...
      return csv_filename
    
    @graph
    def process(context, csv_filename:str):
      profile_dataset(export_2_parquet(from_code_2_categories(transform_dates(load_csv_into_duckdb(csv_filename)))))
    
      
    @job
    def pipeline():
      csv_filename_list = get_csv_filenames()
      generate_subtasks(csv_filename_list).map(process)