pythonmlopsfeature-storemlrun

Issue with usage two different engines for ingest to one ParquetTarget


I tried to use two different ingests from two different engines e.g. storey and pandas to the same ParquetTarget, but I do not know how. I tested, but without success.

It is not a problem to create one ingest (with one engine) to the two different targets (parallel ingestion) via feature_set.set_targets, it works see sample (but I need something different solution):

project = mlrun.get_or_create_project(name, context='./', user_project=False)
feature_set = fstore.FeatureSet(feature_name, entities=[fstore.Entity("km", description='km description')], engine="storey")
feature_set.set_targets(targets=[mlrun.datastore.ParquetTarget(), mlrun.datastore.NoSqlTarget()], with_defaults=False)
...
fstore.ingest(feature_set, frm)

BTW: The main motivation is, to save space in v3io file systems for two different processes (if the ParquetTarget is the same).


Solution

  • I see the solution, based on these principles:

    1. If you need to use two different engines, you have to use two different feature sets
    2. Each feature set will use relation to the same target type
    3. Each target type will refer to the same storage (parquet target storage)

    See sample:

    # first feature set, engine 'storey'
    fs_01 = fstore.FeatureSet("fs01", entities=[fstore.Entity("km")],
                                    engine="storey")
    fs_01.set_targets(targets=[mlrun.datastore.ParquetTarget(name='s01', path=f"v3io:///projects/tst2/",partitioned=False)], with_defaults=False)
    fs_01.save()
    
    #second feature set, engine 'pandas'
    fs_02 = fstore.FeatureSet("fs02", entities=[fstore.Entity("km")],
                                    engine="pandas")
    fs_02.set_targets(targets=[mlrun.datastore.ParquetTarget(name='s02', path=f"v3io:///projects/tst2/",partitioned=False)], with_defaults=False)
    fs_02.save()
    
    
    # generate sample data
    dataFrm01 = pandas.DataFrame(numpy.random.randint(low=0, high=1000,
                                                    size=(100, 10)),  # rows, columns
                               columns=[f"fn{i}" for i in range(10)])
    dataFrm02 = pandas.DataFrame(numpy.random.randint(low=0, high=1000,
                                                    size=(100, 10)),  # rows, columns
                               columns=[f"fn{i}" for i in range(10)])
    
    # ingest data with two different engines to the same parquet target
    fstore.ingest(fs_01,dataFrm01,overwrite=False)
    fstore.ingest(fs_02,dataFrm02,overwrite=False)