I'm working on a PyFlink job that reads data from a Kafka topic using the FlinkKafkaConsumer connector. However, I'm encountering a persistent issue related to the google module when trying to run the job on my Flink cluster. The job works fine locally, but when I submit it to the cluster, I receive the following error:
File "/tmp/pyflink/7fc48e92-9dc7-468b-9c34-cddc89128621/efe4c9e6-a4e9-41be-a7da-cf7fe8be3165/flink_local.py", line 35, in main
stream.map(write_to_file, output_type=Types.STRING())
File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 291, in map
File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 557, in process
File "<frozen zipimport>", line 259, in load_module
File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py", line 23, in <module>
ModuleNotFoundError: No module named 'google'
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
I've tried various troubleshooting steps, including:
- Ensuring the required libraries are installed within my virtual environment.
- Double-checking that my Flink job is running within the correct virtual environment.
- Checking the Flink configuration to ensure the virtual environment is set up correctly.
- Verifying that my imports within the flink_local.py script are accurate and consistent.
- Testing the script locally.
Despite these efforts, I'm unable to resolve the error.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.common import RestartStrategies
def write_to_file(value):
with open('flink-output.txt', 'a') as f:
f.write(value + '\n')
return value
def main():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(3, 1000))
properties = {
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
}
schema = SimpleStringSchema()
kafka_consumer = FlinkKafkaConsumer("flink-read", schema, properties)
kafka_consumer.set_start_from_earliest()
stream = env.add_source(kafka_consumer)
stream.map(write_to_file, output_type=Types.STRING())
env.execute("Kafka to File")
if __name__ == '__main__':
main()