1

I'm encountering an issue while using Spark in jupyter notebook with Elasticsearch. When calling the save() method in Spark with Elasticsearch, I'm getting the following error: Py4JJavaError: An error occurred while calling o175.save.: java.lang.NoClassDefFoundError: scala/Product$class.

I'm using Windows 10 (Docker Desktop), Spark version 3.3.1, Elasticsearch version 8.8.0 and Jupyter-Version 4.7.1.

I run both as Docker container for Spark and Jupyter i use the Image jupyter/pyspark-notebook:spark-3.3.1 (Dokumentation). For Elasticsearch i have two containers with the image docker.elastic.co/elasticsearch/elasticsearch:8.8.0. I also have one container with kibana:8.8.0. I start the Container with docker-compose up. My docker-compose.yml is based on the docker-compose.yml of the offical elasticsearch Documentation.

After adaptation, the docker-compose.yml looks like this:

version: "1.0"
services:
  notebooks:
    build:
      context: data-science/.
      dockerfile: Dockerfile
    container_name: aips-data-science-own
    ports:
      - 7077:7077 # Spark Master
      - 8082:8080 # Spark Master UI - 8082 less likely to conflict
      - 8081:8081 # Spark Worker UI
      - 4040:4040 # Spark UI
      - 4041:4041 # Spark UI
      - 8888:8888 # Jupyter Notebook UI
      - 2345:2345 # Search Webserver
    depends_on:
      - es01
      - es02
    networks:
      - jup-es
    restart: unless-stopped
    #environment:
    #PYSPARK_SUBMIT_ARGS: '--jars /usr/local/spark/lib/spark-solr-4.0.0-shaded.jar pyspark-shell'
    #NB_USER: 'aips'
    #NB_UID: 1010
    #NB_GID: 1020
    #CHOWN_HOME: 'yes'
    #CHOWN_HOME_OPTS: -R
    volumes:
      - type: bind
        source: "./data-science/notebooks/"
        target: "/tmp/notebooks/"

  setup:
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
    user: "0"
    command: >
      bash -c '
        if [ x${ELASTIC_PASSWORD} == x ]; then
          echo "Set the ELASTIC_PASSWORD environment variable in the .env file";
          exit 1;
        elif [ x${KIBANA_PASSWORD} == x ]; then
          echo "Set the KIBANA_PASSWORD environment variable in the .env file";
          exit 1;
        fi;
        if [ ! -f config/certs/ca.zip ]; then
          echo "Creating CA";
          bin/elasticsearch-certutil ca --silent --pem -out config/certs/ca.zip;
          unzip config/certs/ca.zip -d config/certs;
        fi;
        if [ ! -f config/certs/certs.zip ]; then
          echo "Creating certs";
          echo -ne \
          "instances:\n"\
          "  - name: es01\n"\
          "    dns:\n"\
          "      - es01\n"\
          "      - localhost\n"\
          "    ip:\n"\
          "      - 127.0.0.1\n"\
          "  - name: es02\n"\
          "    dns:\n"\
          "      - es02\n"\
          "      - localhost\n"\
          "    ip:\n"\
          "      - 127.0.0.1\n"\
          "  - name: es03\n"\
          "    dns:\n"\
          "      - es03\n"\
          "      - localhost\n"\
          "    ip:\n"\
          "      - 127.0.0.1\n"\
          > config/certs/instances.yml;
          bin/elasticsearch-certutil cert --silent --pem -out config/certs/certs.zip --in config/certs/instances.yml --ca-cert config/certs/ca/ca.crt --ca-key config/certs/ca/ca.key;
          unzip config/certs/certs.zip -d config/certs;
        fi;
        echo "Setting file permissions"
        chown -R root:root config/certs;
        find . -type d -exec chmod 750 \{\} \;;
        find . -type f -exec chmod 640 \{\} \;;
        echo "Waiting for Elasticsearch availability";
        until curl -s --cacert config/certs/ca/ca.crt https://es01:9200 | grep -q "missing authentication credentials"; do sleep 30; done;
        echo "Setting kibana_system password";
        until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://es01:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done;
        echo "All done!";
      '
    healthcheck:
      test: ["CMD-SHELL", "[ -f config/certs/es01/es01.crt ]"]
      interval: 1s
      timeout: 5s
      retries: 120
    networks:
      - jup-es

  es01:
    depends_on:
      setup:
        condition: service_healthy
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
      - esdata01:/usr/share/elasticsearch/data
    ports:
      - ${ES_PORT}:9200
    environment:
      - node.name=es01
      - cluster.name=${CLUSTER_NAME}
      - cluster.initial_master_nodes=es01,es02
      - discovery.seed_hosts=es02
      - ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
      - bootstrap.memory_lock=true
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true
      - xpack.security.http.ssl.key=certs/es01/es01.key
      - xpack.security.http.ssl.certificate=certs/es01/es01.crt
      - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.enabled=true
      - xpack.security.transport.ssl.key=certs/es01/es01.key
      - xpack.security.transport.ssl.certificate=certs/es01/es01.crt
      - xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.verification_mode=certificate
      - xpack.license.self_generated.type=${LICENSE}
    mem_limit: ${MEM_LIMIT}
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120
    networks:
      - jup-es

  es02:
    depends_on:
      - es01
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
      - esdata02:/usr/share/elasticsearch/data
    environment:
      - node.name=es02
      - cluster.name=${CLUSTER_NAME}
      - cluster.initial_master_nodes=es01,es02,es03
      - discovery.seed_hosts=es01,es03
      - bootstrap.memory_lock=true
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true
      - xpack.security.http.ssl.key=certs/es02/es02.key
      - xpack.security.http.ssl.certificate=certs/es02/es02.crt
      - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.enabled=true
      - xpack.security.transport.ssl.key=certs/es02/es02.key
      - xpack.security.transport.ssl.certificate=certs/es02/es02.crt
      - xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.verification_mode=certificate
      - xpack.license.self_generated.type=${LICENSE}
    mem_limit: ${MEM_LIMIT}
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120
    networks:
      - jup-es

  kibana:
    depends_on:
      es01:
        condition: service_healthy
      es02:
        condition: service_healthy
    image: docker.elastic.co/kibana/kibana:${STACK_VERSION}
    volumes:
      - certs:/usr/share/kibana/config/certs
      - kibanadata:/usr/share/kibana/data
    ports:
      - ${KIBANA_PORT}:5601
    environment:
      - SERVERNAME=kibana
      - ELASTICSEARCH_HOSTS=https://es01:9200
      - ELASTICSEARCH_USERNAME=kibana_system
      - ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}
      - ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt
    mem_limit: ${MEM_LIMIT}
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120
    networks:
      - jup-es

volumes:
  certs:
    driver: local
  esdata01:
    driver: local
  esdata02:
    driver: local
  esdata03:
    driver: local
  kibanadata:
    driver: local

networks:
  jup-es:

The corresponding DOCKERFILE looks like this:

FROM jupyter/pyspark-notebook:spark-3.3.1

USER root

#install gcc, c++, and related dependencies needed to for pip to build some python dependencies
RUN sudo apt-get -y update && apt-get install -y --reinstall build-essential gcc cargo

# Spark dependencies
#ENV SPARK_SOLR_VERSION=4.0.2
ENV SHADED_SOLR_JAR_PATH=/usr/local/spark/lib/elasticsearch-hadoop-8.8.0.jar

# Install Spark-Solr
RUN mkdir -p /usr/local/spark/lib/ && cd /usr/local/spark/lib/ && \
    wget -q https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-8.8.0.zip && \
    echo "08056c36ab26f2d5ebfd84375d30f6259b35dfb0b7bf9da021a3f66249ac5db50807e5292910a332e3fa91e5704ed0c131b4854e9fd3cf2d3de4b21dd69d0017  elasticsearch-hadoop-8.8.0.zip" | sha512sum -c - && \
    chmod a+rwx /usr/local/spark/lib/ && \
    unzip elasticsearch-hadoop-8.8.0.zip && \
    wget -q https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/8.8.0/elasticsearch-spark-30_2.12-8.8.0.jar

COPY notebooks notebooks

WORKDIR /home/$NB_USER

# Pull Requirements, Install Notebooks
COPY requirements.txt ./
ENV BLIS_ARCH="generic" 

RUN python -m pip --no-cache-dir install --upgrade pip && \
  pip --no-cache-dir install -r requirements.txt

RUN python -m spacy download en_core_web_sm
RUN pip --no-cache-dir install https://github.com/explosion/spacy-experimental/releases/download/v0.6.1/en_coreference_web_trf-3.4.0a2-py3-none-any.whl

COPY log4j.properties /usr/local/spark/conf/

RUN chown -R $NB_UID:$NB_UID /home/$NB_USER
USER $NB_UID

# Spark Config
ENV SPARK_OPTS="$SPARK_OPTS --driver-java-options=\"-DXlint:none -Dlog4j.logLevel=error -Dallow-access=java.nio.DirectByteBuffer -Dlog4j.logger.org.apache.spark.repl.Main=ERROR\" --spark.ui.showConsoleProgress=False --spark.driver.extraLibraryPath=$SHADED_SOLR_JAR_PATH --spark.executor.extraLibraryPath=$SHADED_SOLR_JAR_PATH" \
    PYSPARK_SUBMIT_ARGS="-c spark.driver.defaultJavaOptions=\"-DXlint=none -Dlog4j.logLevel=error -Dallow-access=java.nio.DirectByteBuffer\" -c spark.ui.showConsoleProgress=False --jars $SHADED_SOLR_JAR_PATH pyspark-shell" \
    PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-*-src.zip:%PYTHONPATH%

WORKDIR notebooks

WORKDIR /tmp/notebooks

CMD jupyter notebook --ip=0.0.0.0 --no-browser --NotebookApp.token='' --NotebookApp.password=''

The Full Error Message is

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[12], line 9
      1 df.write.format(
      2     "org.elasticsearch.spark.sql"
      3 ).option(
      4     "es.resource", '%s' % ('humans')
      5 ).option(
      6     "es.nodes", 'localhost'
      7 ).option(
      8     "es.port", '9200'
----> 9 ).save()
     10 print("Successfully ingested data into Elasticsearch!")

File /usr/local/spark/python/pyspark/sql/readwriter.py:966, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
    964     self.format(format)
    965 if path is None:
--> 966     self._jwrite.save()
    967 else:
    968     self._jwrite.save(path)

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /usr/local/spark/python/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o175.save.
: java.lang.NoClassDefFoundError: scala/Product$class
    at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:228)
    at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
    ... 43 more

The Code i run is:

import findspark
from elasticsearch import Elasticsearch
import requests

findspark.init("/usr/local/spark/")
from pyspark.sql import SparkSession, functions as func

es = Elasticsearch(['https://es01:9200'], basic_auth=('elastic', 'mychoosenpassword'), verify_certs=False)

spark = SparkSession.builder.appName('task').getOrCreate()

df = spark.read.format("csv").option("header","true").load("./dataset/path/file.csv").fillna(0)["the first column", "the second column", "..."]

... # some formating 

df.write.format(
    "org.elasticsearch.spark.sql"
).option(
    "es.resource", '%s' % ('name of the index') # i used an actual index name
).option(
    "es.nodes", 'localhost'
).option(
    "es.port", '9200'
).save()
print("Successfully ingested data into Elasticsearch!")

The following steps were carried out for error analysis:

In the Spark Documentation are the dependecys listed.

Spark runs on Java 8/11/17, Scala 2.12/2.13, Python 3.7+ and R 3.5+. Java 8 prior to version 8u201 support is deprecated as of Spark 3.2.0. When using the Scala API, it is necessary for applications to use the same version of Scala that Spark was compiled for. For example, when using Scala 2.13, use Spark compiled for 2.13, and compile code/applications for Scala 2.13 as well.

I checked the spark and scala version via typing the command spark-shell --version in the Terminal of the docker container apis-data-science-own (jupyter-notebook). The Output was

version 3.3.1               
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 17.0.5
Branch HEAD
Compiled by user yumwang on 2022-10-15T09:47:01Z
Revision fbbcf9434ac070dd4ced4fb9efe32899c6db12a9
Url https://github.com/apache/sparkPython, Elasticsearch and Apache Spark, Simple data reading
Type --help for more information.

The Image jupyter/pyspark-notebook:spark-3.3.1 contains the OpenJDK for Java 17. For my understanding the dependencies are fullfilled.

While Searching for an Approaches to solve the Problem i visited

Visited Sites diagnosed problem Solution
java-lang-noclassdeffounderror-scala-productclass wrong scala version correction of scala version used
Python, Elasticsearch and Apache Spark, Simple data reading wrong scala version in hadoop jar use the elasticsearch-spark-30_2.13 artifact (Maven Central Repository Search)

After i read the linked Articel from the last visited site. I tried to provide additionaly the "right .jar" File with the lines

# Install Spark-Solr
RUN mkdir -p /usr/local/spark/lib/ && cd /usr/local/spark/lib/ && \
    wget -q https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-8.8.0.zip && \
    echo "08056c36ab26f2d5ebfd84375d30f6259b35dfb0b7bf9da021a3f66249ac5db50807e5292910a332e3fa91e5704ed0c131b4854e9fd3cf2d3de4b21dd69d0017  elasticsearch-hadoop-8.8.0.zip" | sha512sum -c - && \
    chmod a+rwx /usr/local/spark/lib/ && \
    unzip elasticsearch-hadoop-8.8.0.zip && \
    wget -q https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/8.8.0/elasticsearch-spark-30_2.12-8.8.0.jar

In the Dockerfile.

As Reference for the python Code i used the github Repo and other sources like Bulk load to Elastic Search with PySpar

Edit 06.06.23 I set the environment variable PYSPARK_SUBMIT_ARGS

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/lib/elasticsearch-spark-30_2.12-8.8.0.jar pyspark-shell'

GPT gave me the hint:

The error "ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource" indicates that the Elasticsearch Spark library is not available in the Spark context. It appears that the Elasticsearch Spark library has not been added correctly or is not in the correct version. Here are some steps you can try: ...

Now i got the error Java gateway process exited before sending the driver its port number

After a kernel restart i got the "old" error again

Any insights or suggestions on how to resolve this issue would be greatly appreciated. Thank you!

0 Answers0