I'm encountering an issue while using Spark in jupyter notebook with Elasticsearch. When calling the save() method in Spark with Elasticsearch, I'm getting the following error: Py4JJavaError: An error occurred while calling o175.save.: java.lang.NoClassDefFoundError: scala/Product$class.
I'm using Windows 10 (Docker Desktop), Spark version 3.3.1, Elasticsearch version 8.8.0 and Jupyter-Version 4.7.1.
I run both as Docker container for Spark and Jupyter i use the Image jupyter/pyspark-notebook:spark-3.3.1 (Dokumentation). For Elasticsearch i have two containers with the image docker.elastic.co/elasticsearch/elasticsearch:8.8.0. I also have one container with kibana:8.8.0. I start the Container with docker-compose up. My docker-compose.yml is based on the docker-compose.yml of the offical elasticsearch Documentation.
After adaptation, the docker-compose.yml looks like this:
version: "1.0"
services:
notebooks:
build:
context: data-science/.
dockerfile: Dockerfile
container_name: aips-data-science-own
ports:
- 7077:7077 # Spark Master
- 8082:8080 # Spark Master UI - 8082 less likely to conflict
- 8081:8081 # Spark Worker UI
- 4040:4040 # Spark UI
- 4041:4041 # Spark UI
- 8888:8888 # Jupyter Notebook UI
- 2345:2345 # Search Webserver
depends_on:
- es01
- es02
networks:
- jup-es
restart: unless-stopped
#environment:
#PYSPARK_SUBMIT_ARGS: '--jars /usr/local/spark/lib/spark-solr-4.0.0-shaded.jar pyspark-shell'
#NB_USER: 'aips'
#NB_UID: 1010
#NB_GID: 1020
#CHOWN_HOME: 'yes'
#CHOWN_HOME_OPTS: -R
volumes:
- type: bind
source: "./data-science/notebooks/"
target: "/tmp/notebooks/"
setup:
image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
volumes:
- certs:/usr/share/elasticsearch/config/certs
user: "0"
command: >
bash -c '
if [ x${ELASTIC_PASSWORD} == x ]; then
echo "Set the ELASTIC_PASSWORD environment variable in the .env file";
exit 1;
elif [ x${KIBANA_PASSWORD} == x ]; then
echo "Set the KIBANA_PASSWORD environment variable in the .env file";
exit 1;
fi;
if [ ! -f config/certs/ca.zip ]; then
echo "Creating CA";
bin/elasticsearch-certutil ca --silent --pem -out config/certs/ca.zip;
unzip config/certs/ca.zip -d config/certs;
fi;
if [ ! -f config/certs/certs.zip ]; then
echo "Creating certs";
echo -ne \
"instances:\n"\
" - name: es01\n"\
" dns:\n"\
" - es01\n"\
" - localhost\n"\
" ip:\n"\
" - 127.0.0.1\n"\
" - name: es02\n"\
" dns:\n"\
" - es02\n"\
" - localhost\n"\
" ip:\n"\
" - 127.0.0.1\n"\
" - name: es03\n"\
" dns:\n"\
" - es03\n"\
" - localhost\n"\
" ip:\n"\
" - 127.0.0.1\n"\
> config/certs/instances.yml;
bin/elasticsearch-certutil cert --silent --pem -out config/certs/certs.zip --in config/certs/instances.yml --ca-cert config/certs/ca/ca.crt --ca-key config/certs/ca/ca.key;
unzip config/certs/certs.zip -d config/certs;
fi;
echo "Setting file permissions"
chown -R root:root config/certs;
find . -type d -exec chmod 750 \{\} \;;
find . -type f -exec chmod 640 \{\} \;;
echo "Waiting for Elasticsearch availability";
until curl -s --cacert config/certs/ca/ca.crt https://es01:9200 | grep -q "missing authentication credentials"; do sleep 30; done;
echo "Setting kibana_system password";
until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://es01:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done;
echo "All done!";
'
healthcheck:
test: ["CMD-SHELL", "[ -f config/certs/es01/es01.crt ]"]
interval: 1s
timeout: 5s
retries: 120
networks:
- jup-es
es01:
depends_on:
setup:
condition: service_healthy
image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
volumes:
- certs:/usr/share/elasticsearch/config/certs
- esdata01:/usr/share/elasticsearch/data
ports:
- ${ES_PORT}:9200
environment:
- node.name=es01
- cluster.name=${CLUSTER_NAME}
- cluster.initial_master_nodes=es01,es02
- discovery.seed_hosts=es02
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- bootstrap.memory_lock=true
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=true
- xpack.security.http.ssl.key=certs/es01/es01.key
- xpack.security.http.ssl.certificate=certs/es01/es01.crt
- xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
- xpack.security.transport.ssl.enabled=true
- xpack.security.transport.ssl.key=certs/es01/es01.key
- xpack.security.transport.ssl.certificate=certs/es01/es01.crt
- xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
- xpack.security.transport.ssl.verification_mode=certificate
- xpack.license.self_generated.type=${LICENSE}
mem_limit: ${MEM_LIMIT}
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test:
[
"CMD-SHELL",
"curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
]
interval: 10s
timeout: 10s
retries: 120
networks:
- jup-es
es02:
depends_on:
- es01
image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
volumes:
- certs:/usr/share/elasticsearch/config/certs
- esdata02:/usr/share/elasticsearch/data
environment:
- node.name=es02
- cluster.name=${CLUSTER_NAME}
- cluster.initial_master_nodes=es01,es02,es03
- discovery.seed_hosts=es01,es03
- bootstrap.memory_lock=true
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=true
- xpack.security.http.ssl.key=certs/es02/es02.key
- xpack.security.http.ssl.certificate=certs/es02/es02.crt
- xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
- xpack.security.transport.ssl.enabled=true
- xpack.security.transport.ssl.key=certs/es02/es02.key
- xpack.security.transport.ssl.certificate=certs/es02/es02.crt
- xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
- xpack.security.transport.ssl.verification_mode=certificate
- xpack.license.self_generated.type=${LICENSE}
mem_limit: ${MEM_LIMIT}
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test:
[
"CMD-SHELL",
"curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
]
interval: 10s
timeout: 10s
retries: 120
networks:
- jup-es
kibana:
depends_on:
es01:
condition: service_healthy
es02:
condition: service_healthy
image: docker.elastic.co/kibana/kibana:${STACK_VERSION}
volumes:
- certs:/usr/share/kibana/config/certs
- kibanadata:/usr/share/kibana/data
ports:
- ${KIBANA_PORT}:5601
environment:
- SERVERNAME=kibana
- ELASTICSEARCH_HOSTS=https://es01:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}
- ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt
mem_limit: ${MEM_LIMIT}
healthcheck:
test:
[
"CMD-SHELL",
"curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'",
]
interval: 10s
timeout: 10s
retries: 120
networks:
- jup-es
volumes:
certs:
driver: local
esdata01:
driver: local
esdata02:
driver: local
esdata03:
driver: local
kibanadata:
driver: local
networks:
jup-es:
The corresponding DOCKERFILE looks like this:
FROM jupyter/pyspark-notebook:spark-3.3.1
USER root
#install gcc, c++, and related dependencies needed to for pip to build some python dependencies
RUN sudo apt-get -y update && apt-get install -y --reinstall build-essential gcc cargo
# Spark dependencies
#ENV SPARK_SOLR_VERSION=4.0.2
ENV SHADED_SOLR_JAR_PATH=/usr/local/spark/lib/elasticsearch-hadoop-8.8.0.jar
# Install Spark-Solr
RUN mkdir -p /usr/local/spark/lib/ && cd /usr/local/spark/lib/ && \
wget -q https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-8.8.0.zip && \
echo "08056c36ab26f2d5ebfd84375d30f6259b35dfb0b7bf9da021a3f66249ac5db50807e5292910a332e3fa91e5704ed0c131b4854e9fd3cf2d3de4b21dd69d0017 elasticsearch-hadoop-8.8.0.zip" | sha512sum -c - && \
chmod a+rwx /usr/local/spark/lib/ && \
unzip elasticsearch-hadoop-8.8.0.zip && \
wget -q https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/8.8.0/elasticsearch-spark-30_2.12-8.8.0.jar
COPY notebooks notebooks
WORKDIR /home/$NB_USER
# Pull Requirements, Install Notebooks
COPY requirements.txt ./
ENV BLIS_ARCH="generic"
RUN python -m pip --no-cache-dir install --upgrade pip && \
pip --no-cache-dir install -r requirements.txt
RUN python -m spacy download en_core_web_sm
RUN pip --no-cache-dir install https://github.com/explosion/spacy-experimental/releases/download/v0.6.1/en_coreference_web_trf-3.4.0a2-py3-none-any.whl
COPY log4j.properties /usr/local/spark/conf/
RUN chown -R $NB_UID:$NB_UID /home/$NB_USER
USER $NB_UID
# Spark Config
ENV SPARK_OPTS="$SPARK_OPTS --driver-java-options=\"-DXlint:none -Dlog4j.logLevel=error -Dallow-access=java.nio.DirectByteBuffer -Dlog4j.logger.org.apache.spark.repl.Main=ERROR\" --spark.ui.showConsoleProgress=False --spark.driver.extraLibraryPath=$SHADED_SOLR_JAR_PATH --spark.executor.extraLibraryPath=$SHADED_SOLR_JAR_PATH" \
PYSPARK_SUBMIT_ARGS="-c spark.driver.defaultJavaOptions=\"-DXlint=none -Dlog4j.logLevel=error -Dallow-access=java.nio.DirectByteBuffer\" -c spark.ui.showConsoleProgress=False --jars $SHADED_SOLR_JAR_PATH pyspark-shell" \
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-*-src.zip:%PYTHONPATH%
WORKDIR notebooks
WORKDIR /tmp/notebooks
CMD jupyter notebook --ip=0.0.0.0 --no-browser --NotebookApp.token='' --NotebookApp.password=''
The Full Error Message is
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[12], line 9
1 df.write.format(
2 "org.elasticsearch.spark.sql"
3 ).option(
4 "es.resource", '%s' % ('humans')
5 ).option(
6 "es.nodes", 'localhost'
7 ).option(
8 "es.port", '9200'
----> 9 ).save()
10 print("Successfully ingested data into Elasticsearch!")
File /usr/local/spark/python/pyspark/sql/readwriter.py:966, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
964 self.format(format)
965 if path is None:
--> 966 self._jwrite.save()
967 else:
968 self._jwrite.save(path)
File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /usr/local/spark/python/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)
File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o175.save.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:228)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
... 43 more
The Code i run is:
import findspark
from elasticsearch import Elasticsearch
import requests
findspark.init("/usr/local/spark/")
from pyspark.sql import SparkSession, functions as func
es = Elasticsearch(['https://es01:9200'], basic_auth=('elastic', 'mychoosenpassword'), verify_certs=False)
spark = SparkSession.builder.appName('task').getOrCreate()
df = spark.read.format("csv").option("header","true").load("./dataset/path/file.csv").fillna(0)["the first column", "the second column", "..."]
... # some formating
df.write.format(
"org.elasticsearch.spark.sql"
).option(
"es.resource", '%s' % ('name of the index') # i used an actual index name
).option(
"es.nodes", 'localhost'
).option(
"es.port", '9200'
).save()
print("Successfully ingested data into Elasticsearch!")
The following steps were carried out for error analysis:
In the Spark Documentation are the dependecys listed.
Spark runs on Java 8/11/17, Scala 2.12/2.13, Python 3.7+ and R 3.5+. Java 8 prior to version 8u201 support is deprecated as of Spark 3.2.0. When using the Scala API, it is necessary for applications to use the same version of Scala that Spark was compiled for. For example, when using Scala 2.13, use Spark compiled for 2.13, and compile code/applications for Scala 2.13 as well.
I checked the spark and scala version via typing the command spark-shell --version
in the Terminal of the docker container apis-data-science-own (jupyter-notebook).
The Output was
version 3.3.1
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 17.0.5
Branch HEAD
Compiled by user yumwang on 2022-10-15T09:47:01Z
Revision fbbcf9434ac070dd4ced4fb9efe32899c6db12a9
Url https://github.com/apache/sparkPython, Elasticsearch and Apache Spark, Simple data reading
Type --help for more information.
The Image jupyter/pyspark-notebook:spark-3.3.1 contains the OpenJDK for Java 17. For my understanding the dependencies are fullfilled.
While Searching for an Approaches to solve the Problem i visited
Visited Sites | diagnosed problem | Solution |
---|---|---|
java-lang-noclassdeffounderror-scala-productclass | wrong scala version | correction of scala version used |
Python, Elasticsearch and Apache Spark, Simple data reading | wrong scala version in hadoop jar | use the elasticsearch-spark-30_2.13 artifact (Maven Central Repository Search) |
After i read the linked Articel from the last visited site. I tried to provide additionaly the "right .jar" File with the lines
# Install Spark-Solr
RUN mkdir -p /usr/local/spark/lib/ && cd /usr/local/spark/lib/ && \
wget -q https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-8.8.0.zip && \
echo "08056c36ab26f2d5ebfd84375d30f6259b35dfb0b7bf9da021a3f66249ac5db50807e5292910a332e3fa91e5704ed0c131b4854e9fd3cf2d3de4b21dd69d0017 elasticsearch-hadoop-8.8.0.zip" | sha512sum -c - && \
chmod a+rwx /usr/local/spark/lib/ && \
unzip elasticsearch-hadoop-8.8.0.zip && \
wget -q https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/8.8.0/elasticsearch-spark-30_2.12-8.8.0.jar
In the Dockerfile.
As Reference for the python Code i used the github Repo and other sources like Bulk load to Elastic Search with PySpar
Edit 06.06.23 I set the environment variable PYSPARK_SUBMIT_ARGS
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/lib/elasticsearch-spark-30_2.12-8.8.0.jar pyspark-shell'
GPT gave me the hint:
The error "ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource" indicates that the Elasticsearch Spark library is not available in the Spark context. It appears that the Elasticsearch Spark library has not been added correctly or is not in the correct version. Here are some steps you can try: ...
Now i got the error Java gateway process exited before sending the driver its port number
After a kernel restart i got the "old" error again
Any insights or suggestions on how to resolve this issue would be greatly appreciated. Thank you!