I'm trying Flink's DataStream API in Python.The env is:
- flink: 1.16.0
- python package: apache-flink==1.16.0
My code was:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.enable_checkpointing(1000)
source = KafkaSource.builder() \
.set_bootstrap_servers('xxx') \
.set_topics("xxx") \
.set_group_id("xxx") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.set_property('commit.offsets.on.checkpoint', 'true') \
.build()
stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "team_config_source")
sink = FileSink \
.for_row_format('/opt/result/', Encoder.simple_string_encoder("UTF-8")) \
.with_output_file_config(OutputFileConfig.builder()
.with_part_prefix("team_config")
.with_part_suffix(".json")
.build()) \
.with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024 ** 3, rollover_interval=15 * 60 * 1000,
inactivity_interval=5 * 60 * 1000)) \
.build()
def mapping(data):
return data
stream.map(mapping, BasicTypeInfo.STRING_TYPE_INFO()).sink_to(sink)
env.execute()
But Flink gives me this error:
2023-01-18 11:34:34 Traceback (most recent call last):
2023-01-18 11:34:34 File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
2023-01-18 11:34:34 return _run_code(code, main_globals, None,
2023-01-18 11:34:34 File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
2023-01-18 11:34:34 exec(code, run_globals)
2023-01-18 11:34:34 File "/tmp/pyflink/00606d52-b6c1-4e13-b7cb-73ee8e196db6/42be79fb-c8bb-4de1-b0fb-c89a7702cddc/flink_driver.py", line 223, in <module>
2023-01-18 11:34:34 process2()
2023-01-18 11:34:34 File "/tmp/pyflink/00606d52-b6c1-4e13-b7cb-73ee8e196db6/42be79fb-c8bb-4de1-b0fb-c89a7702cddc/flink_driver.py", line 218, in process2
2023-01-18 11:34:34 stream.map(mapping, BasicTypeInfo.STRING_TYPE_INFO()).sink_to(sink)
2023-01-18 11:34:34 File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 312, in map
2023-01-18 11:34:34 File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 654, in process
2023-01-18 11:34:34 File "<frozen importlib._bootstrap>", line 991, in _find_and_load
2023-01-18 11:34:34 File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
2023-01-18 11:34:34 File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
2023-01-18 11:34:34 File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
2023-01-18 11:34:34 File "<frozen zipimport>", line 259, in load_module
2023-01-18 11:34:34 File "/opt/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py", line 38, in <module>
2023-01-18 11:34:34 AttributeError: 'NoneType' object has no attribute 'message_types_by_name'
It seems this exception was thrown in flink_fn_execution_pb2.py
file, so I check the code in the package. The code at the beginning of this file was:
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile('xxx')
_INPUT = DESCRIPTOR.message_types_by_name['Input']
But I also find that the function AddSerializedFile()
only returns None
. This function in descriptor_pool.py
was:
def AddSerializedFile(self, serialized_file_desc_proto):
"""Adds the FileDescriptorProto and its types to this pool.
Args:
serialized_file_desc_proto (bytes): A bytes string, serialization of the
:class:`FileDescriptorProto` to add.
"""
# pylint: disable=g-import-not-at-top
from google.protobuf import descriptor_pb2
file_desc_proto = descriptor_pb2.FileDescriptorProto.FromString(
serialized_file_desc_proto)
self.Add(file_desc_proto)
So the DESCRIPTOR
in flink_fn_execution_pb2.py
is always None
, the map()
and flat_map()
functions always failed with this exception.
Can anyone help with this problem? Am I using map()
in wrong way or it's a bug?