pythonmachine-learningneuraxle

How to implement a repository for lazy data loading with neuraxle?


In the neuraxle documentation there is an example shown, using a repository for lazy loading data within a pipeline, see the following code:

from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.base import ExecutionContext
from neuraxle.steps.column_transformer import ColumnTransformer
from neuraxle.steps.flow import TrainOnlyWrapper

training_data_ids = training_data_repository.get_all_ids()
context = ExecutionContext('caching_folder').set_service_locator({
    BaseRepository: training_data_repository
})

pipeline = Pipeline([
    ConvertIDsToLoadedData().assert_has_services(BaseRepository),
    ColumnTransformer([
        (range(0, 2), DateToCosineEncoder()),
        (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
    ]),
    Normalizer(),
    TrainOnlyWrapper(DataShuffler()),
    MiniBatchSequentialPipeline([
        Model()
    ], batch_size=128)
]).with_context(context)

However, it is not shown, how to implement the BaseRepository and ConvertIDsToLoadedData classes. What would be the best way to implement those classes? Could anyone give an example?


Solution

  • I didn't check wheter or not the following compiles, but it should look like what follows. Please someone edit this answer if you find something to change and tried to compile it:

    class BaseDataRepository(ABC): 
    
        @abstractmethod
        def get_all_ids(self) -> List[int]: 
            pass
    
        @abstractmethod
        def get_data_from_id(self, _id: int) -> object: 
            pass
    
    class InMemoryDataRepository(BaseDataRepository): 
        def __init__(self, ids, data): 
            self.ids: List[int] = ids
            self.data: Dict[int, object] = data
    
        def get_all_ids(self) -> List[int]: 
            return list(self.ids)
    
        def get_data_from_id(self, _id: int) -> object: 
            return self.data[_id]
    
    class ConvertIDsToLoadedData(BaseStep): 
        def _transform_data_container(self, data_container: DataContainer, context: ExecutionContext): 
            repo: BaseDataRepository = context.get_service(BaseDataRepository)
            ids = data_container.data_inputs
    
            # Replace data ids by their loaded object counterpart: 
            data_container.data_inputs = [repo.get_data_from_id(_id) for _id in ids]
    
            return data_container, context
    
    context = ExecutionContext('caching_folder').set_service_locator({
        BaseDataRepository: InMemoryDataRepository(ids, data)  # or insert here any other replacement class that inherits from `BaseDataRepository` when you'll change the database to a real one (e.g.: SQL) rather than a cheap "InMemory" stub. 
    })
    

    For updates, see the issue I opened here for this question: https://github.com/Neuraxio/Neuraxle/issues/421