0

I am getting following error : py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Version: Windows 10 , Python: 3.8.10, py4j: 0.10.9.7 , apache-flink: 1.17.1 java version : 1.8.0_351

import logging
import os
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema


# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
# created before executing this job.
def write_to_kafka(env):
    type_info = Types.ROW([Types.INT(), Types.STRING()])
    ds = env.from_collection(
        [(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
        type_info=type_info)

    serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
    kafka_producer = FlinkKafkaProducer(
        topic='test_json_topic',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
    )

    # note that the output type of ds must be RowTypeInfo
    ds.add_sink(kafka_producer)
    env.execute()


def read_from_kafka(env):
    deserialization_schema = JsonRowDeserializationSchema.Builder() \
        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
        .build()
    kafka_consumer = FlinkKafkaConsumer(
        topics='test_json_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
    )
    kafka_consumer.set_start_from_earliest()

    env.add_source(kafka_consumer).print()
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
                             'flink-sql-connector-kafka-1.17.1.jar')
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///{}".format(kafka_jar))

    print("start writing data to kafka")
    write_to_kafka(env)

    print("start reading data from kafka")
    read_from_kafka(env)

Note: I have taken the code from this repo: Official Pylink Examples

user3024119
  • 190
  • 1
  • 2
  • 11
  • I tried this it didnt work: https://stackoverflow.com/questions/50064646/py4j-protocol-py4jjavaerror-occurred-while-calling-zorg-apache-spark-api-python – user3024119 Jun 12 '23 at 09:43
  • I referred this link also: https://stackoverflow.com/questions/75976664/reading-from-kafka-with-pyflink-not-working – user3024119 Jun 12 '23 at 09:47
  • Apache team has closed the issue without the solution https://issues.apache.org/jira/browse/FLINK-27478 – user3024119 Jun 12 '23 at 09:59
  • I have tried this solution didint work: https://stackoverflow.com/questions/72028346/pyflink-with-kafka-java-lang-runtimeexception-failed-to-create-stage-bundle-fac/72037218#72037218 – user3024119 Jun 12 '23 at 10:00

0 Answers0