pythondagster

Is it possible to transform one asset into another asset using ops in dagster?


From what I found here, it is possible to use ops and graphs to generate assets.

However, I would like to use an asset as an input for an op. I am exploring it for a following use case:

  1. I fetch a list of country metadata from an external API and store it in my resource:
@dagster.asset
def country_metadata_asset() -> List[Dict]:
    ...
  1. I use this asset to define some downstream assets, for example:
@dagster.asset
def country_names_asset(country_metadata_asset) -> List[str]:
    ...
  1. I would like to use this asset to call another data source to retrieve and validate data and then write it to my resource. It returns a huge amount of rows. That is why I need to do it somehow in batch, and I thought that graph with ops would be a better choice for it. I thought to do something like this:
@dagster.op(out=dagster.DynamicOut())
def load_country_names(country_names_asset):
    for country_index, country_name in enumerate(country_names_asset):
        yield dagster.DynamicOutput(
            country_name, mapping_key=f"{country_index} {country_name}"
        )

@dagster.graph()
def update_data_graph():
    country_names = load_country_names()
    country_names.map(retrieve_and_process_data)


@dagster.job()
def run_update_job():
    update_data_graph()

It seems that my approach does not work, and I am not sure if it is conceptually correct. My questions are:

  1. How to tell dagster that the input for load_country_names is an asset? Should I manually materialise it inside op?

  2. How to efficiently write augmented data that I return from retrieve_and_process_data into my resource? It is not possible to keep data in memory. So I thought to implement it somehow using a custom IOManager, but I am not sure how to do it.


Solution

  • It seems to me like the augmented data that's returned from retrieve_and_process_data can (at least in theory) be represented by an asset.

    So we can start from the standpoint that we'd like to create some asset that takes in country_names_asset, as well as the source data asset (the thing that has a bunch of rows in it, which we can call big_country_data_asset for now). I think this models the underlying relationships a bit better, independent of how we're actually implementing things.

    The question then is how to write the computation function for this asset in a way that doesn't require loading the entire contents of country_data_asset into memory at any point in time. While it's possible that you could do this with a dynamic graph, which you then wrap in a call to AssetsDefinition.from_graph, I think there's an easier approach.

    Dagster allows you to circumvent the IOManager machinery both when reading an asset as input, as well as when writing an asset as output. In essence, when you set an AssetKey as a non_argument_dep, this tells Dagster that there is some asset which is upstream of the asset you're defining, but will be loaded within the body of the asset function (rather than being loaded by Dagster using IOManager machinery).

    Similarly, if you set the output type of the function to None, this tells Dagster that the asset you're defining will be persisted by the logic inside of the function, rather than by an IOManager.

    Using both these concepts, we can write an asset which at no point needs to have the entire big_country_data_asset loaded.

    @asset(non_argument_deps={AssetKey("big_country_data_asset")})
    def processed_country_data_asset(country_names_asset) -> None:
        for name in country_names_asset:
            # assuming this function actually stores data somewhere,
            # and intrinsically knows how to read from big_country_data_asset
            retrieve_and_process_data(name)
    

    IOManagers are a very flexible concept however, and it is possible to replicate all of this same batching behavior while using IOManagers (just a bit more convoluted). You'd need to do something like create a SourceAsset(key="big_country_data_asset", io_manager_def=my_custom_io_manager), where my_custom_io_manager has a weird load_input function which itself returns a function like:

    def load_input(context):
        def _fn(country_name):
            # however you actually get these rows
            rows = query_source_data_for_name(country_name)
            return rows
        return _fn
    

    then, you could define your asset like:

    @asset
    def processed_country_data_asset(
        country_names_asset, big_country_data_asset
    ) -> None:
        for name in country_names_asset:
            # big_country_data_asset has been loaded as a function
            rows = big_country_data_asset(name)
            process_data(rows)
    

    You can also handle writing the output of this function in an IOManager using a similar-looking trick: https://github.com/dagster-io/dagster/discussions/9772