I am getting following error : py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Version: Windows 10 , Python: 3.8.10, py4j: 0.10.9.7 , apache-flink: 1.17.1 java version : 1.8.0_351
import logging
import os
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
# created before executing this job.
def write_to_kafka(env):
type_info = Types.ROW([Types.INT(), Types.STRING()])
ds = env.from_collection(
[(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
type_info=type_info)
serialization_schema = JsonRowSerializationSchema.Builder() \
.with_type_info(type_info) \
.build()
kafka_producer = FlinkKafkaProducer(
topic='test_json_topic',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
)
# note that the output type of ds must be RowTypeInfo
ds.add_sink(kafka_producer)
env.execute()
def read_from_kafka(env):
deserialization_schema = JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([Types.INT(), Types.STRING()])) \
.build()
kafka_consumer = FlinkKafkaConsumer(
topics='test_json_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
)
kafka_consumer.set_start_from_earliest()
env.add_source(kafka_consumer).print()
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-sql-connector-kafka-1.17.1.jar')
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///{}".format(kafka_jar))
print("start writing data to kafka")
write_to_kafka(env)
print("start reading data from kafka")
read_from_kafka(env)
Note: I have taken the code from this repo: Official Pylink Examples