I ran into a problem using TFX, MLMD, and Apache-Airflow as the orchestrator. Local-dag-runner, provided by TFX, works fine, resulting in distinct artifacts for each pipeline component run. The problem arises when airflow is used as the orchestrator. A pipeline runs without errors if you just rerun it without making any changes to the code. However, when you change some parameters (e.g., in the Tuner's config file), the pipeline fails to finish a run. Here is the error message:
ml_metadata.errors.AlreadyExistsError: Given node already exists: type_id: 20
Here is almost the entire log:
> AIRFLOW_CTX_DAG_OWNER=airflow
> AIRFLOW_CTX_DAG_ID=consumer_complaint_pipeline_airflow
> AIRFLOW_CTX_TASK_ID=Transform
> AIRFLOW_CTX_EXECUTION_DATE=2022-07-12T10:25:40.659461+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=manual__2022-07-12T10:25:40.659461+00:00 [2022-07-12 14:57:12,605] {base_component_launcher.py:189} INFO - Running driver for Transform [2022-07-12 14:57:12,612] {metadata_store.py:105} INFO - MetadataStore with DB connection initialized [2022-07-12 14:57:12,800] {taskinstance.py:1889} ERROR - Task failed with exception
> Traceback (most recent call last): File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/airflow/operators/python.py", line 171, in execute
return_value = self.execute_callable() File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/airflow/operators/python.py", line 189, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs) File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/orchestration/airflow/airflow_component.py", line 76, in _airflow_component_launcher
launcher.launch() File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 191, in launch
execution_decision = self._run_driver(self._input_dict, self._output_dict, File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 155, in _run_driver
execution_decision = driver.pre_execution( File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/dsl/components/base/base_driver.py", line 320, in pre_execution
self._metadata_handler.update_execution( File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/tfx/orchestration/metadata.py", line 651, in update_execution
_, a_ids, _ = self.store.put_execution(execution, artifacts_and_events, File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/ml_metadata/metadata_store/metadata_store.py", line 591, in put_execution
self._call('PutExecution', request, response) File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/ml_metadata/metadata_store/metadata_store.py", line 188, in _call
return self._call_method(method_name, request, response) File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/ml_metadata/metadata_store/metadata_store.py", line 209, in _call_method
self._pywrap_cc_call(cc_method, request, response) File "/home/mlops/GIT_REPO/davari/lib/python3.9/site-packages/ml_metadata/metadata_store/metadata_store.py", line 240, in _pywrap_cc_call
raise _make_exception(error_message.decode('utf-8'), status_code)
> ml_metadata.errors.AlreadyExistsError: Given node already exists: type_id: 20 uri: "/home/mlops/airflow/tfx/consumer_complaint_pipeline_airflow/Transform/transform_graph/24"
> custom_properties { key: "name" value {
string_value: "transform_graph" } } custom_properties { key: "producer_component" value {
string_value: "Transform" } } name: "transform_graph"
> INTERNAL: Cannot create node for type_id: 20 uri:
Edit