In the neuraxle documentation there is an example shown, using a repository for lazy loading data within a pipeline, see the following code:
from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.base import ExecutionContext
from neuraxle.steps.column_transformer import ColumnTransformer
from neuraxle.steps.flow import TrainOnlyWrapper
training_data_ids = training_data_repository.get_all_ids()
context = ExecutionContext('caching_folder').set_service_locator({
BaseRepository: training_data_repository
})
pipeline = Pipeline([
ConvertIDsToLoadedData().assert_has_services(BaseRepository),
ColumnTransformer([
(range(0, 2), DateToCosineEncoder()),
(3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
]),
Normalizer(),
TrainOnlyWrapper(DataShuffler()),
MiniBatchSequentialPipeline([
Model()
], batch_size=128)
]).with_context(context)
However, it is not shown, how to implement the BaseRepository
and ConvertIDsToLoadedData
classes. What would be the best way to implement those classes? Could anyone give an example?
I didn't check wheter or not the following compiles, but it should look like what follows. Please someone edit this answer if you find something to change and tried to compile it:
class BaseDataRepository(ABC):
@abstractmethod
def get_all_ids(self) -> List[int]:
pass
@abstractmethod
def get_data_from_id(self, _id: int) -> object:
pass
class InMemoryDataRepository(BaseDataRepository):
def __init__(self, ids, data):
self.ids: List[int] = ids
self.data: Dict[int, object] = data
def get_all_ids(self) -> List[int]:
return list(self.ids)
def get_data_from_id(self, _id: int) -> object:
return self.data[_id]
class ConvertIDsToLoadedData(BaseStep):
def _transform_data_container(self, data_container: DataContainer, context: ExecutionContext):
repo: BaseDataRepository = context.get_service(BaseDataRepository)
ids = data_container.data_inputs
# Replace data ids by their loaded object counterpart:
data_container.data_inputs = [repo.get_data_from_id(_id) for _id in ids]
return data_container, context
context = ExecutionContext('caching_folder').set_service_locator({
BaseDataRepository: InMemoryDataRepository(ids, data) # or insert here any other replacement class that inherits from `BaseDataRepository` when you'll change the database to a real one (e.g.: SQL) rather than a cheap "InMemory" stub.
})
For updates, see the issue I opened here for this question: https://github.com/Neuraxio/Neuraxle/issues/421