dagster

Dagster having a daily partitioned asset depend on the previous partition


I have two assets my_asset_a which runs every day at 4 pm and my_asset_b which runs every day at 6 pm. Since my_asset_b runs after my_asset_a I'd like for my_asset_a to depend on the previous partition key of my_asset_b e.g. if the partition_key is 2024-07-08 I'd like my_asset_a to depend on the partition_key of 2024-07-07 for my_asset_b. The reason for this is I'd like the flexibility to use my_asset_b as a dependency on both T and T-1.

How can I set an asset to depend on a previous partition?

@asset(partitions_def=daily_partitions_def)
def my_asset_a(context, my_asset_b):
    dt = context.partition_key
    return run_a(dt, my_asset_b)

@asset(partitions_def=daily_partitions_def)
def my_asset_b(context):
    dt = context.partition_key
    return run_b(dt)

Solution

  • For this, you can use ins and set the offset of a [TimeWindowPartitionMapping][1] to -1. The ins parameter provides the dependency between my_asset_b and my_asset_a, allows you to use my_asset_b within your my_asset_a function.

    @asset(
        partitions_def=daily_partitions_def,
        ins={
            "my_asset_b": AssetIn(
                partition_mapping=TimeWindowPartitionMapping(
                    start_offset=-1, end_offset=-1
                ),
            )
        },
    )
    def my_asset_a(context, my_asset_b):
        dt = context.partition_key
        return run_a(dt, my_asset_b)
    
    @asset(partitions_def=daily_partitions_def)
    def my_asset_b(context):
        dt = context.partition_key
        return run_b(dt)