4

I'm trying Flink's DataStream API in Python.The env is:

  • flink: 1.16.0
  • python package: apache-flink==1.16.0

My code was:

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.enable_checkpointing(1000)

    source = KafkaSource.builder() \
        .set_bootstrap_servers('xxx') \
        .set_topics("xxx") \
        .set_group_id("xxx") \
        .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .set_property('commit.offsets.on.checkpoint', 'true') \
        .build()

    stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "team_config_source")

    sink = FileSink \
        .for_row_format('/opt/result/', Encoder.simple_string_encoder("UTF-8")) \
        .with_output_file_config(OutputFileConfig.builder()
                                 .with_part_prefix("team_config")
                                 .with_part_suffix(".json")
                                 .build()) \
        .with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024 ** 3, rollover_interval=15 * 60 * 1000,
                                                                  inactivity_interval=5 * 60 * 1000)) \
        .build()

    def mapping(data):
        return data

    stream.map(mapping, BasicTypeInfo.STRING_TYPE_INFO()).sink_to(sink)
    env.execute()

But Flink gives me this error:

2023-01-18 11:34:34 Traceback (most recent call last):
2023-01-18 11:34:34   File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
2023-01-18 11:34:34     return _run_code(code, main_globals, None,
2023-01-18 11:34:34   File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
2023-01-18 11:34:34     exec(code, run_globals)
2023-01-18 11:34:34   File "/tmp/pyflink/00606d52-b6c1-4e13-b7cb-73ee8e196db6/42be79fb-c8bb-4de1-b0fb-c89a7702cddc/flink_driver.py", line 223, in <module>
2023-01-18 11:34:34     process2()
2023-01-18 11:34:34   File "/tmp/pyflink/00606d52-b6c1-4e13-b7cb-73ee8e196db6/42be79fb-c8bb-4de1-b0fb-c89a7702cddc/flink_driver.py", line 218, in process2
2023-01-18 11:34:34     stream.map(mapping, BasicTypeInfo.STRING_TYPE_INFO()).sink_to(sink)
2023-01-18 11:34:34   File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 312, in map
2023-01-18 11:34:34   File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 654, in process
2023-01-18 11:34:34   File "<frozen importlib._bootstrap>", line 991, in _find_and_load
2023-01-18 11:34:34   File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
2023-01-18 11:34:34   File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
2023-01-18 11:34:34   File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
2023-01-18 11:34:34   File "<frozen zipimport>", line 259, in load_module
2023-01-18 11:34:34   File "/opt/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py", line 38, in <module>
2023-01-18 11:34:34 AttributeError: 'NoneType' object has no attribute 'message_types_by_name'

It seems this exception was thrown in flink_fn_execution_pb2.py file, so I check the code in the package. The code at the beginning of this file was:

DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile('xxx')
_INPUT = DESCRIPTOR.message_types_by_name['Input']

But I also find that the function AddSerializedFile() only returns None. This function in descriptor_pool.py was:

  def AddSerializedFile(self, serialized_file_desc_proto):
    """Adds the FileDescriptorProto and its types to this pool.

    Args:
      serialized_file_desc_proto (bytes): A bytes string, serialization of the
        :class:`FileDescriptorProto` to add.
    """

    # pylint: disable=g-import-not-at-top
    from google.protobuf import descriptor_pb2
    file_desc_proto = descriptor_pb2.FileDescriptorProto.FromString(
        serialized_file_desc_proto)
    self.Add(file_desc_proto)

So the DESCRIPTOR in flink_fn_execution_pb2.py is always None, the map() and flat_map() functions always failed with this exception. Can anyone help with this problem? Am I using map() in wrong way or it's a bug?

Rinze
  • 706
  • 1
  • 5
  • 21
  • Exact same issue here. Strange as the code i am running comes from the official examples: https://nightlies.apache.org/flink/flink-docs-release-1.16/api/python/examples/table/word_count.html – Max Krog Jan 28 '23 at 22:18
  • @MaxKrog I also tried to submit this problem to Apache Flink's slack channel, but there is no reply. I have to give up trying use `DataStream.map()` – Rinze Jan 30 '23 at 02:55

0 Answers0