1

Similar to the question described earlier, I followed the spaceflights tutorial, at create pipeline step, I got the following error when running kedro run --node=preproces_companies_node

ValueError: Pipeline does not contain nodes named ['preprocess_companies_node'].

The relevant files are specified as instructed in the tutorial

  • src/kedro_tutorial/pipelines/data_processing/pipeline.py
from kedro.pipeline import Pipeline, node

from .nodes import preprocess_companies, preprocess_shuttles

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
        ]
    )
  • src/kedro_tutorial/pipelines/data_processing/nodes.py
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for companies.

    Args:
        companies: Raw data.
    Returns:
        Preprocessed data, with `company_rating` converted to a float and
        `iata_approved` converted to boolean.
    """
    companies["iata_approved"] = _is_true(companies["iata_approved"])
    companies["company_rating"] = _parse_percentage(companies["company_rating"])
    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for shuttles.

    Args:
        shuttles: Raw data.
    Returns:
        Preprocessed data, with `price` converted to a float and `d_check_complete`,
        `moon_clearance_complete` converted to boolean.
    """
    shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
    shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
    shuttles["price"] = _parse_money(shuttles["price"])
    return shuttles
  • src/kedro_tutorial/pipeline_registry.py
from typing import Dict

from kedro.pipeline import Pipeline

from kedro_tutorial.pipelines import data_processing as dp


def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipeline.

    Returns:
    A mapping from a pipeline name to a ``Pipeline`` object.

    """
    data_processing_pipeline = dp.create_pipeline()

    return {
        "__default__": data_processing_pipeline,
        "dp": data_processing_pipeline,
    }

I made sure I have registered a __default__ pipeline and my node name is exactly as the command runs preprocess_companies_node

My Kedro version is 0.16.6 and python version is 3.7.10

Any idea what I did wrong here?

Thank you.

got2nosth
  • 578
  • 2
  • 8
  • 27
  • What happens when you do `kedro run --pipeline=dp --node=preproces_companies_node`? – swimmer Jul 19 '21 at 17:11
  • @Ignacio that leads me to following error `kedro.framework.context.context.KedroContextError: Failed to find the pipeline named 'dp'. It needs to be generated and returned by the '_get_pipelines' function. ` Looks like the pipeline `dp` is not generated. any hint what might fix it? – got2nosth Jul 20 '21 at 01:27

1 Answers1

3

The issue is that you are following the tutorial for version 0.17.3+ , while using kedro==0.16.6. This is an easy mistake to make, don't fret. The pipeline_registry.py module was introduced in 0.17.3. Your options are to upgrade to the latest kedro version or to put your register your pipelines in a module called hooks.py rather than pipeline_registry.py.

# src/<project_name>/hooks.py
"""Project hooks."""
from typing import Any, Dict, Iterable, Optional

from kedro.config import ConfigLoader
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning import Journal

from sixteen.pipelines import data_engineering as de
from sixteen.pipelines import data_science as ds


class ProjectHooks:
    @hook_impl
    def register_pipelines(self) -> Dict[str, Pipeline]:
        """Register the project's pipeline.

        Returns:
            A mapping from a pipeline name to a ``Pipeline`` object.

        """
        data_engineering_pipeline = de.create_pipeline()
        data_science_pipeline = ds.create_pipeline()

        return {
            "de": data_engineering_pipeline,
            "ds": data_science_pipeline,
            "__default__": data_engineering_pipeline + data_science_pipeline,
        }

    @hook_impl
    def register_config_loader(self, conf_paths: Iterable[str]) -> ConfigLoader:
        return ConfigLoader(conf_paths)

    @hook_impl
    def register_catalog(
        self,
        catalog: Optional[Dict[str, Dict[str, Any]]],
        credentials: Dict[str, Dict[str, Any]],
        load_versions: Dict[str, str],
        save_version: str,
        journal: Journal,
    ) -> DataCatalog:
        return DataCatalog.from_config(
            catalog, credentials, load_versions, save_version, journal
        )


project_hooks = ProjectHooks()

You can generate a full example for this version for yourself by running a kedro new command against this version.

# these bash two commands are safe to run outside of a virtual environment
# pipx creates the virtual environment for you
pip install pipx
pipx run --spec kedro==0.16.6 kedro new

The rest of your code looks like valid 0.16.6 kedro to me. Once you get your pipeline_registry moved into hooks you can confirm that it works with the kedro pipeline list command to ensure kedro is picking up your pipeline code.

Waylon Walker
  • 543
  • 3
  • 10
  • Thanks for the great info. Just to clarify, in `0.17.3`, do I need to declare the pipelines in both `pipeline_registry.py` and `hooks.py`? – got2nosth Jul 21 '21 at 05:44
  • No, you can declare your pipelines in either. I would highly recommend picking just one, but if you do have both, hooks.py will take precedence and overwrite any conflicting named pipelines. This post covers it in a bit more detail. https://waylonwalker.com/kedro-pipeline-registry/#conflict-resolution – Waylon Walker Jul 21 '21 at 14:53