pythonparquetdagster

How to implement io_manager that have a parameter at asset level?


I am a bit confused about the usage of resources, configuration and how they are linked to a context and an asset.

So I have a parquet io manager that is able to manipulate and partitionned not-partitionned datasets. To do so I check the presence of a partition on the context in the self._get_path() method and provide a unique name for each file, using the key of the asset and a date format of the partition.

# from dagter examples
if context.has_asset_partitions:
    end = context.asset_partitions_time_window

Now I have an issue if the same asset is used with different paritions sizes because the names not necesserally are the same during the reading and writing of the files. e.g. I have some 1h partitions asset and some 1d partitions asset using the same base asset.

The solution to this, IMO is to use the filters kwargs from pandas.read_parquet, that would allow me to get only the data inside the time window of the partition. So I want to provide a string parameter to my io manager for it to know which column has to be used to filter the partition interval.

This parameter is obviously linked to an asset.

I could add this as a parameter of my io_manger constructor and create one instance of io_manager per different column name. But I find it cumbersome and my intuition tells me that I should be using the InputContext to retrieve this information. (the same way I am using the context to get the start,end of the partition)

So maybe I should create a ConfigurableResource with only one string attribute (the time column's name), instantiate one object per different column name and provide it to the asset construction (via required_resource_keys?). If this is the right solution, how can I access to the ressource in the io_manager?

Or is there any other parameter of the asset constructor that I should be using to achieve what I want?


Solution

  • With the following IO manager:

    class PartitionedParquetIOManager(IOManager):
        def __init__(self, base_path: str, time_column: str = 'timestamp'):
            self._base_path = Path(base_path)
            self.time_column = time_column
    
        def load_input(self, context: InputContext) -> pandas.DataFrame:
            path = self._get_path(context)
            filters = self._get_filters(context)
    
            context.log.debug(f'Reading from {path} with filters {filters}')
    
            return pandas.read_parquet(path, filters=filters)
    
        def _get_filters(self, context: InputContext) -> List[Tuple[str, str, Any]]:
            filters = []
            start, end = context.asset_partitions_time_window
    
            # wierd casting to be able to use those in a filter
            start = dt.datetime.fromtimestamp(start.timestamp(), pytz.timezone(start.tz.name))
            end = dt.datetime.fromtimestamp(end.timestamp(), pytz.timezone(end.tz.name))
    
            filters.extend([(self.time_column, '>=', start), (self.time_column, '<', end)])
            return filters
    

    A function needs to be created so it can be used in the @asset(io_manager_def=.

    
    @io_manager(
        config_schema={"data_path": Field(str, is_required=False), "time_column": Field(str, is_required=False)},
    )
    def local_partitioned_parquet_io_manager(init_context):
        fallback_path = os.getenv('DAGSTER_ROOT_DATA_FOLDER', get_system_temp_directory())
        return PartitionedParquetIOManager(
            base_path=init_context.resource_config.get("data_path", fallback_path),
            time_column=init_context.resource_config.get('time_column', 'timestamp')
        )
    

    Then the asset can be created with

    
    @asset(io_manager_def=local_partitioned_parquet_io_manager.configured({'time_column': 'timestamp'}),
           partitions_def=six_hours_partitions_30h_offset)
    def my_custom_df(context) -> pd.DataFrame:
        start, end = context.asset_partitions_time_window_for_output()
    
        df = pd.DataFrame({'timestamp': pd.date_range(start, end, freq='5T', tz=pytz.timezone('Europe/Brussels'))})
        df['count'] = df['timestamp'].map(lambda _:  random.randint(1, 1000))
        return df
    
    
    @asset(io_manager_def=local_partitioned_parquet_io_manager,
           partitions_def=daily_partitions)
    def another_custom_df(context, my_custom_df: pd.DataFrame) -> pd.DataFrame:
        return my_custom_df.set_index('timestamp').resample('H').agg({'count': 'mean'}).reset_index()