1

I ran into a problem using TFX, MLMD, and Apache-Airflow as the orchestrator. Local-dag-runner, provided by TFX, works fine, resulting in distinct artifacts for each pipeline component run. The problem arises when airflow is used as the orchestrator. A pipeline runs without errors if you just rerun it without making any changes to the code. However, when you change some parameters (e.g., in the Tuner's config file), the pipeline fails to finish a run. Here is the error message:

ml_metadata.errors.AlreadyExistsError: Given node already exists: type_id: 20

Here is almost the entire log:

> AIRFLOW_CTX_DAG_OWNER=airflow
> AIRFLOW_CTX_DAG_ID=consumer_complaint_pipeline_airflow
> AIRFLOW_CTX_TASK_ID=Transform
> AIRFLOW_CTX_EXECUTION_DATE=2022-07-12T10:25:40.659461+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=manual__2022-07-12T10:25:40.659461+00:00 [2022-07-12 14:57:12,605] {base_component_launcher.py:189} INFO - Running driver for Transform [2022-07-12 14:57:12,612] {metadata_store.py:105} INFO - MetadataStore with DB connection initialized [2022-07-12 14:57:12,800] {taskinstance.py:1889} ERROR - Task failed with exception

> Traceback (most recent call last):   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/orchestration/airflow/airflow_component.py", line 76, in _airflow_component_launcher
    launcher.launch()   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 191, in launch
    execution_decision = self._run_driver(self._input_dict, self._output_dict,   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 155, in _run_driver
    execution_decision = driver.pre_execution(   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/dsl/components/base/base_driver.py", line 320, in pre_execution
    self._metadata_handler.update_execution(   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/orchestration/metadata.py", line 651, in update_execution
    _, a_ids, _ = self.store.put_execution(execution, artifacts_and_events,   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/ml_metadata/metadata_store/metadata_store.py", line 591, in put_execution
    self._call('PutExecution', request, response)   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/ml_metadata/metadata_store/metadata_store.py", line 188, in _call
    return self._call_method(method_name, request, response)   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/ml_metadata/metadata_store/metadata_store.py", line 209, in _call_method
    self._pywrap_cc_call(cc_method, request, response)   File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/ml_metadata/metadata_store/metadata_store.py", line 240, in _pywrap_cc_call
    raise _make_exception(error_message.decode('utf-8'), status_code)

> ml_metadata.errors.AlreadyExistsError: Given node already exists: type_id: 20 uri: "/home/mlops/airflow/tfx/consumer_complaint_pipeline_airflow/Transform/transform_graph/24"

> custom_properties {   key: "name"   value {
    string_value: "transform_graph"   } } custom_properties {   key: "producer_component"   value {
    string_value: "Transform"   } } name: "transform_graph"

> INTERNAL: Cannot create node for type_id: 20 uri:

Edit

Parham Davari
  • 379
  • 4
  • 6
  • 2
    This might be because of the naming convention that was used to save artifacts, Here is a [similar issue](https://github.com/tensorflow/tfx/issues/4977) opened in `TFX` repo which you can follow up to know more. Thank you! –  Jul 14 '22 at 17:12
  • That's exactly right. As a result of comparing the Artifact tables, we discover that local-dag-runner uses timestamps to name artifacts (the 'name' column in the Artifact table), but Airflow does not. @TensorflowSupport – Parham Davari Jul 14 '22 at 20:52

1 Answers1

1

A PR is already submitted to address the artifact naming issue. Once the PR is merged, you can try and let us know if the issue still persists. Thank you!