1

I could run this example in the terminal. My terminal command is:

bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 examples/src/main/python/sql/streaming/structured_kafka_wordcount.py localhost:9092 subscribe test

Now I wants to run it in Juypter python notebook. I tried to follow this (I could run the code in the link). But in my case, it failed. The following is my code:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell"

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

bootstrapServers = "localhost:9092"
subscribeType = "subscribe"
topics = "test"

spark = SparkSession\
    .builder\
    .appName("StructuredKafkaWordCount")\
    .getOrCreate()

# Create DataSet representing the stream of input lines from kafka
lines = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option(subscribeType, topics)\
    .load()\
    .selectExpr("CAST(value AS STRING)")

# Split the lines into words
words = lines.select(
    # explode turns each item in an array into a separate row
    explode(
        split(lines.value, ' ')
    ).alias('word')
)

# Generate running word count
wordCounts = words.groupBy('word').count()

# Start running the query that prints the running counts to the console
query = wordCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .start()

query.awaitTermination()

The error message is:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-0344129c7d54> in <module>()
     14 
     15 # Create DataSet representing the stream of input lines from kafka
---> 16 lines = spark    .readStream    .format("kafka")    .option("kafka.bootstrap.servers", bootstrapServers)    .option(subscribeType, topics)    .load()    .selectExpr("CAST(value AS STRING)")
     ...

Py4JJavaError: An error occurred while calling o31.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
    at java.base/java.lang.ClassLoader.defineClass1(Native Method)
    ...

Any ideas? Thanks!


Update:

I tried to follow the answer but still got error. The following is my procedure. I searched that there are two kernel.json, they are

~/anaconda3/pkgs/ipykernel-4.6.1-py36h3208c25_0/share/jupyter/kernels/python3/kernel.json
~/anaconda3/share/jupyter/kernels/python3/kernel.json

Then I updated them all with the following content:

{
    "display_name": "PySpark",
    "language": "python",
    "argv": [ "</usr>/anaconda3/bin/python", "-m", "ipykernel", "-f", "  {connection_file}" ],
    "env": {
        "SPARK_HOME": "</usr>/projects/spark-2.3.0",
        "PYSPARK_PYTHON": "</usr>/anaconda3/bin/python",
        "PYTHONPATH": "</usr>/projects/spark-2.3.0/spark/python/:</usr>/projects/spark-2.3.0/spark/python/lib/py4j-0.10.6-src.zip",
        "PYTHONSTARTUP": "</usr>/projects/spark-2.3.0/python/pyspark/shell.py",
        "PYSPARK_SUBMIT_ARGS":  "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell"
    }
}

Then I got error as follows:

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/<usr>/projects/spark-2.3.0/assembly/target/scala-2.11/jars/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
kww
  • 411
  • 4
  • 12
  • 21
  • `PYSPARK_SUBMIT_ARGS` will work if and only if JVM is initialized after it is set. – zero323 Apr 16 '18 at 16:30
  • @user6910411 I used it following [this link](https://stackoverflow.com/questions/35946868/adding-custom-jars-to-pyspark-in-jupyter-notebook). It worked in that example. – kww Apr 16 '18 at 16:58

2 Answers2

2

As @user6910411 said PYSPARK_SUBMIT_ARGS can only work before the instantiation of your sparkContext.

In the example you followed, they probably use a python Kernel for their jupyter notebook and they instantiate a spark context using the pyspark library.

I'm guessing you're using a pyspark kernel, hence:

spark = SparkSession\
    .builder\
    .appName("StructuredKafkaWordCount")\
    .getOrCreate()

won't start a sparkSession but only fetch the already existing one.

You can pass arguments to the spark-submit ran by jupyter in your kernel.json file so the libraries get loaded every time you run a new notebook:

{
    "display_name": "PySpark",
    "language": "python",
    "argv": [ "/opt/anaconda3/bin/python", "-m", "ipykernel", "-f", "  {connection_file}" ],
    "env": {
        "SPARK_HOME": "/usr/iop/current/spark-client",
        "PYSPARK_PYTHON": "/opt/anaconda3/bin/python3",
        "PYTHONPATH": "/usr/iop/current/spark-client/python/:/usr/iop/current/spark-client/python/lib/py4j-0.9-src.zip",
        "PYTHONSTARTUP": "/usr/iop/current/spark-client/python/pyspark/shell.py",
        "PYSPARK_SUBMIT_ARGS":  "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell"
  }
}
MaFF
  • 9,551
  • 2
  • 32
  • 41
  • I followed your suggestions, but still got error (I updated with the error) – kww Apr 18 '18 at 19:26
  • Only add the `"PYSPARK_SUBMIT_ARGS"` part to your existing pyspark `kernel.json` file (if there is one). To list the available kernels you can call `jupyter kernelspec list` – MaFF Apr 19 '18 at 06:40
1

This is how I can config to run PySpark (verison with scala 2.12 Spark 3.2.1) Structure Streaming with Kafka on jupyter lab

First,I download 5 jars files and I put them in the folder /jars under my current project folder (just for local run I think):

  • spark-sql-kafka-0-10_2.12-3.2.1.jar
  • kafka-clients-2.1.1.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar
  • commons-pool2-2.8.0.jar
  • spark-token-provider-kafka-0-10_2.12-3.2.1.jar

The value of config spark.jars look like this "<path-to-jar/test1.jar>,<path-to-jar/test2.jar>"

This is the actual code:

spark_jars =  ("{},{},{},{},{}".format(os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar",  
                                      os.getcwd() + "/jars/kafka-clients-2.1.1.jar", 
                                      os.getcwd() + "/jars/spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar", 
                                      os.getcwd() + "/jars/commons-pool2-2.8.0.jar",  
                                      os.getcwd() + "/jars/spark-token-provider-kafka-0-10_2.12-3.2.1.jar"))




spark = SparkSession.builder.config("spark.jars", spark_jars).appName("Structured_Redpanda_WordCount").getOrCreate()



spark.conf.set("spark.sql.shuffle.partitions", 1
Rafael Z. B. Bravo
  • 1,022
  • 10
  • 23