pythonpipelinemlopsfeature-storemlrun

Issue with the aggregation function in the pipeline during online ingest


I see issue in the aggregation function (part of pipeline) during the online ingest, because aggregation output is invalid (output is different then expectation, I got value 0 instead of 6). The pipeline is really very simple:

enter image description here

See part of code (Python and MLRun):

import datetime
import mlrun
import mlrun.feature_store as fstore
from mlrun.datastore.targets import ParquetTarget, NoSqlTarget

# Prepare data, four columns key0, key1, fn1, sysdate
data = {"key0":[1,1,1,1,1,1], "key1":[0,0,0,0,0,0],"fn1":[1,1,2,3,1,0],
            "sysdate":[datetime.datetime(2021,1,1,1), datetime.datetime(2021,1,1,1),
            datetime.datetime(2021,1,1,1), datetime.datetime(2021,1,1,1),
            datetime.datetime(2021,1,1,1), datetime.datetime(2021,1,1,1)]}

# Create project and featureset with NoSqlTarget & ParquetTarget
project = mlrun.get_or_create_project("jist-agg",context='./', user_project=False)
feature_set=featureGetOrCreate(True,project_name, 'sample')

# Add easy aggregation 'agg1'
feature_set.add_aggregation(name='fn1',column='fn1',operations=['count'],windows=['60d'],step_name="agg1")

# Ingest data to the on-line and off-line targets
output_df=fstore.ingest(feature_set, input_df, overwrite=True, infer_options=fstore.InferOptions.default())

# Read data from online source
svc=fstore.get_online_feature_service(fstore.FeatureVector("my-vec", ["sample.*"], with_indexes=True))
resp = svc.get([{"key0": 1, "key1":0} ])

# Output validation
assert resp[0]['fn1_count_60d'] == 6.0, 'Mistake in solution'

Do you see the mistake?


Solution

  • Whole code is valid, but the issue is on side of knowledge ;-).

    Key information is that aggregation for on-line target works from 1970-01-01T00:00:00Z till specific day and step/epoch is the time window (it is behavioral of MLRun 1.2.1).

    You have two possible solutions:

    1. Change input data (move date from 2021 to 2023)

    # Prepare data, four columns key0, key1, fn1, sysdate
    data = {"key0":[1,1,1,1,1,1], "key1":[0,0,0,0,0,0],"fn1":[1,1,2,3,1,0],
                "sysdate":[datetime.datetime(2023,1,1,1), datetime.datetime(2023,1,1,1),
                datetime.datetime(2023,1,1,1), datetime.datetime(2023,1,1,1),
                datetime.datetime(2023,1,1,1), datetime.datetime(2023,1,1,1)]}
    

    or

    2. Extend window for calculation (e.g. 3 years = ~1095 days)

    # Add easy aggregation 'agg1'
    feature_set.add_aggregation(name='fn1',column='fn1',operations=['count'],windows=['1095d'],step_name="agg1")