I am a bit confused about the usage of resources, configuration and how they are linked to a context and an asset.
So I have a parquet io manager that is able to manipulate and partitionned not-partitionned datasets.
To do so I check the presence of a partition on the context in the self._get_path()
method and provide a unique name for each file, using the key of the asset and a date format of the partition.
# from dagter examples
if context.has_asset_partitions:
end = context.asset_partitions_time_window
Now I have an issue if the same asset is used with different paritions sizes because the names not necesserally are the same during the reading and writing of the files. e.g. I have some 1h partitions asset and some 1d partitions asset using the same base asset.
The solution to this, IMO is to use the filters
kwargs from pandas.read_parquet
, that would allow me to get only the data inside the time window of the partition.
So I want to provide a string parameter to my io manager for it to know which column has to be used to filter the partition interval.
This parameter is obviously linked to an asset.
I could add this as a parameter of my io_manger constructor and create one instance of io_manager per different column name. But I find it cumbersome and my intuition tells me that I should be using the InputContext to retrieve this information. (the same way I am using the context to get the start,end of the partition)
So maybe I should create a ConfigurableResource with only one string attribute (the time column's name), instantiate one object per different column name and provide it to the asset construction (via required_resource_keys?). If this is the right solution, how can I access to the ressource in the io_manager?
Or is there any other parameter of the asset constructor that I should be using to achieve what I want?
With the following IO manager:
class PartitionedParquetIOManager(IOManager):
def __init__(self, base_path: str, time_column: str = 'timestamp'):
self._base_path = Path(base_path)
self.time_column = time_column
def load_input(self, context: InputContext) -> pandas.DataFrame:
path = self._get_path(context)
filters = self._get_filters(context)
context.log.debug(f'Reading from {path} with filters {filters}')
return pandas.read_parquet(path, filters=filters)
def _get_filters(self, context: InputContext) -> List[Tuple[str, str, Any]]:
filters = []
start, end = context.asset_partitions_time_window
# wierd casting to be able to use those in a filter
start = dt.datetime.fromtimestamp(start.timestamp(), pytz.timezone(start.tz.name))
end = dt.datetime.fromtimestamp(end.timestamp(), pytz.timezone(end.tz.name))
filters.extend([(self.time_column, '>=', start), (self.time_column, '<', end)])
return filters
A function needs to be created so it can be used in the @asset(io_manager_def=
.
@io_manager(
config_schema={"data_path": Field(str, is_required=False), "time_column": Field(str, is_required=False)},
)
def local_partitioned_parquet_io_manager(init_context):
fallback_path = os.getenv('DAGSTER_ROOT_DATA_FOLDER', get_system_temp_directory())
return PartitionedParquetIOManager(
base_path=init_context.resource_config.get("data_path", fallback_path),
time_column=init_context.resource_config.get('time_column', 'timestamp')
)
Then the asset can be created with
@asset(io_manager_def=local_partitioned_parquet_io_manager.configured({'time_column': 'timestamp'}),
partitions_def=six_hours_partitions_30h_offset)
def my_custom_df(context) -> pd.DataFrame:
start, end = context.asset_partitions_time_window_for_output()
df = pd.DataFrame({'timestamp': pd.date_range(start, end, freq='5T', tz=pytz.timezone('Europe/Brussels'))})
df['count'] = df['timestamp'].map(lambda _: random.randint(1, 1000))
return df
@asset(io_manager_def=local_partitioned_parquet_io_manager,
partitions_def=daily_partitions)
def another_custom_df(context, my_custom_df: pd.DataFrame) -> pd.DataFrame:
return my_custom_df.set_index('timestamp').resample('H').agg({'count': 'mean'}).reset_index()