I was able to resolve this at least in "Session" mode by setting below config parameters in flink-conf.yaml
file.
env.java.opts: "-Djava.library.path=<<path to libraries>>"
containerized.master.env.LD_LIBRARY_PATH: "<<path to libraries>>"
containerized.taskmanager.env.LD_LIBRARY_PATH: "<<path to libraries>>"
You also need to use StreamExecutionEnvironment.registerCachedFile
to pass the extracted files on the JobManager to the TaskManagers involved.
On Driver side -
StreamExecutionEnvironment.getExecutionEnvironment.registerCachedFile(directorywherefilesareextracted,"somekey")
Hope this helps if someone is looking for an approach that could be used to work with such scenario.
You can access these cached files and store them in the directory configured in filnk-conf.yaml
so that they are included in the library path for execution.
getRuntimeContext().getDistributedCache().getFile("somekey")
To be able to access the RuntimeContext, you need to extend RichMapFunction
.
Update:
With all the above changes, when I run the Flink pipeline for the first time, it still complains about library not found. I did check the directory in which I am extracting distributed cache and the libraries are there.
Subsequent runs after the first failure are successful. I am not sure why I am seeing this kind of behavior.
Update:
Made sure that the directory, where we extract the libraries, is readily available when we create EMR cluster and it worked like a charm. I created this directory by configuring Bootstrap action.