pythondagster

Getting current execution date in a task or asset in dagster


Is there an easier way than what I'm doing to get the current date in an dagster asset, than what I'm currently doing?

def current_dt():
    return datetime.today().strftime('%Y-%m-%d')

@asset
def my_task(current_dt):
    return current_dt

In airflow these are passed by default in the python callable function definition ex: def my_task(ds, **kwargs):


Solution

  • In Dagster, the typical way to do things that require Airflow execution_dates is with partitions:

    from dagster import asset, build_schedule_from_partitioned_job, define_asset_job, DailyPartitionsDefinition
    
    partitions_def = DailyPartitionsDefinition(start_date="2020-01-01")
    
    @asset(partitions_def=partitions_def)
    def my_asset(context):
        current_dt = context.asset_partitions_time_window_for_output().start
    
    my_job = define_asset_job("my_job", selection=[my_asset], partitions_def=partitions_def)
    
    defs = Definitions(
        assets=[my_asset],
        schedules=[build_schedule_from_partitioned_job(my_job)],
    )
    

    This will set up a schedule to fill each daily partition at the end of each day, and you can also kick off runs for particular partitions or kick off backfills that materialize sets of partitions.