I see issue in the aggregation function (part of pipeline) during the online ingest, because aggregation output is invalid (output is different then expectation, I got value 0 instead of 6). The pipeline is really very simple:
See part of code (Python and MLRun):
import datetime
import mlrun
import mlrun.feature_store as fstore
from mlrun.datastore.targets import ParquetTarget, NoSqlTarget
# Prepare data, four columns key0, key1, fn1, sysdate
data = {"key0":[1,1,1,1,1,1], "key1":[0,0,0,0,0,0],"fn1":[1,1,2,3,1,0],
"sysdate":[datetime.datetime(2021,1,1,1), datetime.datetime(2021,1,1,1),
datetime.datetime(2021,1,1,1), datetime.datetime(2021,1,1,1),
datetime.datetime(2021,1,1,1), datetime.datetime(2021,1,1,1)]}
# Create project and featureset with NoSqlTarget & ParquetTarget
project = mlrun.get_or_create_project("jist-agg",context='./', user_project=False)
feature_set=featureGetOrCreate(True,project_name, 'sample')
# Add easy aggregation 'agg1'
feature_set.add_aggregation(name='fn1',column='fn1',operations=['count'],windows=['60d'],step_name="agg1")
# Ingest data to the on-line and off-line targets
output_df=fstore.ingest(feature_set, input_df, overwrite=True, infer_options=fstore.InferOptions.default())
# Read data from online source
svc=fstore.get_online_feature_service(fstore.FeatureVector("my-vec", ["sample.*"], with_indexes=True))
resp = svc.get([{"key0": 1, "key1":0} ])
# Output validation
assert resp[0]['fn1_count_60d'] == 6.0, 'Mistake in solution'
Do you see the mistake?
Whole code is valid, but the issue is on side of knowledge ;-).
Key information is that aggregation for on-line target works from 1970-01-01T00:00:00Z till specific day and step/epoch is the time window (it is behavioral of MLRun 1.2.1).
You have two possible solutions:
1. Change input data (move date from 2021 to 2023)
# Prepare data, four columns key0, key1, fn1, sysdate
data = {"key0":[1,1,1,1,1,1], "key1":[0,0,0,0,0,0],"fn1":[1,1,2,3,1,0],
"sysdate":[datetime.datetime(2023,1,1,1), datetime.datetime(2023,1,1,1),
datetime.datetime(2023,1,1,1), datetime.datetime(2023,1,1,1),
datetime.datetime(2023,1,1,1), datetime.datetime(2023,1,1,1)]}
or
2. Extend window for calculation (e.g. 3 years = ~1095 days)
# Add easy aggregation 'agg1'
feature_set.add_aggregation(name='fn1',column='fn1',operations=['count'],windows=['1095d'],step_name="agg1")