1

In the neuraxle documentation there is an example shown, using a repository for lazy loading data within a pipeline, see the following code:

from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.base import ExecutionContext
from neuraxle.steps.column_transformer import ColumnTransformer
from neuraxle.steps.flow import TrainOnlyWrapper

training_data_ids = training_data_repository.get_all_ids()
context = ExecutionContext('caching_folder').set_service_locator({
    BaseRepository: training_data_repository
})

pipeline = Pipeline([
    ConvertIDsToLoadedData().assert_has_services(BaseRepository),
    ColumnTransformer([
        (range(0, 2), DateToCosineEncoder()),
        (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
    ]),
    Normalizer(),
    TrainOnlyWrapper(DataShuffler()),
    MiniBatchSequentialPipeline([
        Model()
    ], batch_size=128)
]).with_context(context)

However, it is not shown, how to implement the BaseRepository and ConvertIDsToLoadedData classes. What would be the best way to implement those classes? Could anyone give an example?

eL_BaRTo
  • 82
  • 1
  • 5

1 Answers1

0

I didn't check wheter or not the following compiles, but it should look like what follows. Please someone edit this answer if you find something to change and tried to compile it:

class BaseDataRepository(ABC): 

    @abstractmethod
    def get_all_ids(self) -> List[int]: 
        pass

    @abstractmethod
    def get_data_from_id(self, _id: int) -> object: 
        pass

class InMemoryDataRepository(BaseDataRepository): 
    def __init__(self, ids, data): 
        self.ids: List[int] = ids
        self.data: Dict[int, object] = data

    def get_all_ids(self) -> List[int]: 
        return list(self.ids)

    def get_data_from_id(self, _id: int) -> object: 
        return self.data[_id]

class ConvertIDsToLoadedData(BaseStep): 
    def _transform_data_container(self, data_container: DataContainer, context: ExecutionContext): 
        repo: BaseDataRepository = context.get_service(BaseDataRepository)
        ids = data_container.data_inputs

        # Replace data ids by their loaded object counterpart: 
        data_container.data_inputs = [repo.get_data_from_id(_id) for _id in ids]

        return data_container, context

context = ExecutionContext('caching_folder').set_service_locator({
    BaseDataRepository: InMemoryDataRepository(ids, data)  # or insert here any other replacement class that inherits from `BaseDataRepository` when you'll change the database to a real one (e.g.: SQL) rather than a cheap "InMemory" stub. 
})

For updates, see the issue I opened here for this question: https://github.com/Neuraxio/Neuraxle/issues/421

Guillaume Chevalier
  • 9,613
  • 8
  • 51
  • 79