4

I am trying to include the org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 package as part of my spark code (via the SparkSession Builder). I understand that I can download the JAR myself and include it but I would like to figure out why the following is not working as expected:

from pyspark.sql import SparkSession
import pyspark
import json

if __name__ == "__main__":
    spark = SparkSession.builder \
        .master("local") \
        .appName("App Name") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3") \
        .getOrCreate()

    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "first_topic") \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    query = df \
        .writeStream \
        .format("console") \
        .outputMode("update") \
        .start()

When I run the job:

spark-submit main.py

I receive the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o48.load.
: org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

If I instead include the packages via the --packages flag, the dependencies are downloaded and the code runs as expected:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 main.py

The code also works if I open the PySpark shell and paste the code above. Is there a reason that the spark-submit ignores the configuration?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Brandon
  • 375
  • 2
  • 16

3 Answers3

1

I think that for configurations like "spark.jars.packages", these should be configured either in spark-defaults or passed by command-line arguments, setting it in the runtime shouldn't work.


Against better judgement

I remember some people claimed something like this worked for them, but I would say that the dependency is already somewhere there (installed in local repo), just loaded.

conf = pyspark.SparkConf()
conf.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3")

spark = SparkSession.builder \
        .master("local") \
        .appName("App Name") \
        .config(conf = conf) \
        .getOrCreate()
  • Does that mean the answer from https://stackoverflow.com/a/45868189/9667721 is not valid either? – Brandon Jun 01 '20 at 00:13
  • 1
    @Brandon good question. What do you mean by valid? If it works it works, but I am against this approach. Bundling your app with dependencies, sure, why not. Documenting requirements for your app, good as well. But making your app download dependencies on the fly? What if your proxy settings change and now you need to debug why your app is not running any more. – Saša Zejnilović Jun 01 '20 at 07:28
  • 1
    I agree with your point about not downloading your dependencies on the fly. The dependencies should ideally be downloaded beforehand and provided to the application. I am just curious as to why the configuration is ignored. Spark should display some kind of warning/error as opposed to ignoring the statement entirely. I also find it odd that the other post I mentioned in the previous makes it seems as if this approach should work. I guess its "use" must have been deprecated. – Brandon Jun 01 '20 at 13:30
  • After battling through a whole day of debugging I'm asking myself the same question. I could not make the code work neither setting `.config` options explicitly in the builder nor specifying a separate `conf` variable and using it in the builder. Curiously, using the `--packages` option with spark-submit did download the package and its dependencies from Maven, but it didn't include them in the classpath, which left me with the same bug. The only solution was to copy the jars directly on the Spark folder. Any rationale as to why PySpark behaves this way and why it isn't documented clearly? – Javier Diego-Fernández Aug 24 '21 at 16:38
1

When you run spark-submit, it already creates a SparkSession that is reused by your code - thus you have to provide everything through spark-submit.

However, you do not need to actually use spark-submit to run your Spark code. Assuming your main method looks like this:

def main():
    spark = SparkSession.builder.config(...).getOrCreate()
    # your spark code below
    ...

You can run this code just via python:

> python ./my_spark_script.py

This will run your program correctly

George
  • 692
  • 5
  • 10
0

I faced same problem, after google I found that link

https://issues.apache.org/jira/browse/SPARK-21752

According to @srowen Sean R. Owen "At that point, your app has already launched. You can't change the driver classpath."

Trolie
  • 441
  • 3
  • 6