0

I'm working on a PyFlink job that reads data from a Kafka topic using the FlinkKafkaConsumer connector. However, I'm encountering a persistent issue related to the google module when trying to run the job on my Flink cluster. The job works fine locally, but when I submit it to the cluster, I receive the following error:

    File "/tmp/pyflink/7fc48e92-9dc7-468b-9c34-cddc89128621/efe4c9e6-a4e9-41be-a7da-cf7fe8be3165/flink_local.py", line 35, in main
    stream.map(write_to_file, output_type=Types.STRING())
  File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 291, in map
  File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 557, in process
  File "<frozen zipimport>", line 259, in load_module
  File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py", line 23, in <module>
ModuleNotFoundError: No module named 'google'
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1

I've tried various troubleshooting steps, including:

  1. Ensuring the required libraries are installed within my virtual environment.
  2. Double-checking that my Flink job is running within the correct virtual environment.
  3. Checking the Flink configuration to ensure the virtual environment is set up correctly.
  4. Verifying that my imports within the flink_local.py script are accurate and consistent.
  5. Testing the script locally.

Despite these efforts, I'm unable to resolve the error.

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.common import RestartStrategies

def write_to_file(value):
    with open('flink-output.txt', 'a') as f:
        f.write(value + '\n')
    return value

def main():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_restart_strategy(RestartStrategies.fixed_delay_restart(3, 1000))

    properties = {
        "bootstrap.servers": "localhost:9092",
        "group.id": "test-group",
    }
    schema = SimpleStringSchema()

    kafka_consumer = FlinkKafkaConsumer("flink-read", schema, properties)
    kafka_consumer.set_start_from_earliest()

    stream = env.add_source(kafka_consumer)
    stream.map(write_to_file, output_type=Types.STRING())

    env.execute("Kafka to File")

if __name__ == '__main__':
    main()
sam1064max
  • 55
  • 6

1 Answers1

0

Environment: AWS EMR, PyFlink 1.16.0

Based on this answer,

bootstrap script with

sudo pip3 install google
sudo pip3 install google-api-core

This is what helped me. Probably you could install it to venv, depending on your cluster...