(also posted on r/dagster)
Dagster N00b here.
I have a very specific use-case. My ETL executes the following steps:
.parquet
fileThe DuckDB tables are named just the same as the CSV files for convenience.
2a through 2e can be done in parallel FOR EACH CSV FILE. Within the context of a single CSV file, they need to run SERIALLY.
My current code is:
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
def load_csv_into_duckdb(context, csv_filename)
def transform_dates(context, csv_filename)
def from_code_2_categories(context, csv_filename)
def export_2_parqu
If I understand correctly, you want depth-first processing, instead of breadth first? I think you might be able to trigger depth-first processing using a nested graph after the dynamic output step. You're also conceptually missing how to set dependencies between ops in Dagster. Something like this should work:
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
@op
def load_csv_into_duckdb(context, csv_filename)
...
return csv_filename
@op
def transform_dates(context, csv_filename)
...
return csv_filename
@op
def from_code_2_categories(context, csv_filename)
...
return csv_filename
@op
def export_2_parquet(context, csv_filename)
...
return csv_filename
@op
def profile_dataset(context, csv_filename)
...
return csv_filename
@graph
def process(context, csv_filename:str):
profile_dataset(export_2_parquet(from_code_2_categories(transform_dates(load_csv_into_duckdb(csv_filename)))))
@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)