6

Thanks a lot for any help!!!

code:

from pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

# stream 模式的env创建
env_settings_stream = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
env_stream = StreamTableEnvironment.create(environment_settings=env_settings_stream)

table1 = env_stream.from_elements([(1, 23.4, 'lili'), (2, 33.4, 'er'), (3, 45.6, 'yu')], ['id', 'order_amt', 'name'])
table2 = env_stream.from_elements([(1, 43.4, 'xixi'), (2, 53.4, 'rr'), (3, 65.6, 'ww')], ['id2', 'order_amt2', 'name'])

# types: List[TypeInformation], field_names: List[str]
# row_type_info = RowTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()], ['id', 'order_amt', 'name'])
row_type_info = TupleTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()])


stream = env_stream.to_append_stream(table1, row_type_info)

error info:

Traceback (most recent call last):
  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o4.toAppendStream. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method toAppendStream([class org.apache.flink.table.api.internal.TableImpl, class org.apache.flink.api.java.typeutils.TupleTypeInfo]) does not exist
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

env:

  1. apache-flink 1.12.0(python flink)
  2. py4j 0.10.8.1 (when pip3 install the apache-flink, the py4j will be install automatically for dependency)
  3. python 3.7(anaconda)
  4. pycharm 2020.1.1 version
  5. mac os 11.1

debug info image 1:

debug info image 1

debug info image 2:

debug info image 2

reproduction step:

  1. the same environment, run the code locally(local mode)
  2. breakpoint at line of code: "stream = env_stream.to_append_stream(table1, row_type_info)"
  3. debug run, and the breakpoint will be triggered twice, the first time toAppendStream method not found, the second time toAppendStream method found. But exception raised for the first time.
Shayan Shafiq
  • 1,447
  • 5
  • 18
  • 25
SZ_MaNong
  • 61
  • 1

0 Answers0