2

I'd like to use a Python library (pyod, latest) in a UDF that has a dependency on Numba (>= 0.50). I created an Aggregation UDF in Python and I am not new to the concept.

I got an error during while starting the job immediately after job execution.

Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 637, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 463, in get
    processor = bundle_processor.BundleProcessor(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 921, in create_execution_tree
    return collections.OrderedDict([(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in get_operation
    transform_consumers = {
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 908, in get_operation
    return transform_factory.create_operation(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 89, in create_group_window_aggregate_function
    return _create_user_defined_function_operation(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 174, in _create_user_defined_function_operation
    return beam_operation_cls(
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 210, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 214, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.generate_operation
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 446, in __init__
    super(StreamGroupWindowAggregateOperation, self).__init__(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 309, in __init__
    super(AbstractStreamGroupAggregateOperation, self).__init__(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 281, in __init__
    super(BaseStatefulOperation, self).__init__(serialized_fn)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 80, in __init__
    self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 329, in generate_func
    extract_user_defined_aggregate_function(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 221, in extract_user_defined_aggregate_function
    user_defined_agg = load_aggregate_function(user_defined_function_proto.payload)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 281, in load_aggregate_function
    return pickle.loads(payload)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
    return cloudpickle.loads(payload)
  File "/tmp/python-dist-ca64683e-f3c8-4ff9-b2a8-8c95c5d508bd/python-files/blob_p-1eee456524b0a216bf998cb36288df034d60c922-5797c5572fd29f1e17b5dd686b627324/dbscan_udf.py", line 31, in <module>
    from pyod.models.ecod import ECOD
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/__init__.py", line 4, in <module>
    from . import utils
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/utils/__init__.py", line 12, in <module>
    from .stat_models import pairwise_distances_no_broadcast
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/utils/stat_models.py", line 11, in <module>
    from numba import njit
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/__init__.py", line 38, in <module>
    from numba.core.decorators import (cfunc, generated_jit, jit, njit, stencil,
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/decorators.py", line 12, in <module>
    from numba.stencils.stencil import stencil
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/stencils/stencil.py", line 11, in <module>
    from numba.core import types, typing, utils, ir, config, ir_utils, registry
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/registry.py", line 4, in <module>
    from numba.core import utils, typing, dispatcher, cpu
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/dispatcher.py", line 13, in <module>
    from numba.core import (
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/compiler.py", line 6, in <module>
    from numba.core import (utils, errors, typing, interpreter, bytecode, postproc,
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/callconv.py", line 12, in <module>
    from numba.core.base import PYOBJECT, GENERIC_POINTER
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/base.py", line 24, in <module>
    from numba.cpython import builtins
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/cpython/builtins.py", line 524, in <module>
    from numba.core.typing.builtins import IndexValue, IndexValueType
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/typing/builtins.py", line 22, in <module>
    @infer_global(print)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/typing/templates.py", line 1278, in register_global
    if getattr(mod, val.__name__) is not val:
AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has no attribute 'print'

My library version (important ones):

  • numba==0.55.1
  • numpy==1.19.5
  • apache-beam==2.27.0
  • apache-flink==1.15.1
  • pyod==1.0.4

Since it is a very strange error caused by Beam, I cannot interpret it.

Does anyone get any idea?

  • I'm not exactly sure what's going wrong but the error you mentioned here is just a secondary error that occurred when Beam tried to perform a an exception or an error log. Are you seeing any other logs that can be related ? – chamikara Aug 23 '22 at 00:05
  • The error is related to the importing `from pyod.models.ecod import ECOD` while unpickling the job. It is not a runtime error, I check the code with PyFlink Table API (it does not use apache-beam to execute Python) and can train and predict as I want. – Metehan Yıldırım Aug 23 '22 at 07:06

0 Answers0