5

I'm having problem understanding how to connect Kafka and PySpark.

I have kafka installation on Windows 10 with topic nicely streaming data. I've installed pyspark which runs properly-I'm able to create test DataFrame without problem.

But when I try to connect to Kafka stream it gives me error:

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming- Kafka Integration Guide".

Spark documentation is not really helpful - it says: ... groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...

For Python applications, you need to add this above library and its dependencies when deploying your application. See the Deploying subsection below.

And then when you go to Deploying section it says:

As with any Spark applications, spark-submit is used to launch your application. spark-sql-kafka-0-10_2.12 and its dependencies can be directly added to spark-submit using --packages, such as, ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ...

I'm developing app, I don't want to deploy it. Where and how to add these dependencies if I'm developing pyspark app?

Tried several tutorials ended up being more confused.

Saw answer saying that

"You need to add kafka-clients JAR to your --packages".so-answer

Few more steps could be useful because for someone who is new this is unclear.

versions:

  • kafka 2.13-2.8.1
  • spark 3.1.2
  • java 11.0.12

All environmental variables and paths are correctly set.

EDIT

I've load :

   os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1'

as suggested but still getting same error. I've triple checked kafka, scala and spark versions and tried various combinations but not it didn't work, I'm still getting same error:

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming-Kafka Integration Guide".

EDIT 2

I installed latest Spark 3.2.0 and Hadoop 3.3.1 and kafka version kafka_2.12-2.8.1. Changed all environmental variables, tested Spark and Kafka - working properly.

My environment variable looks like this now:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.kafka:kafka-clients:2.8.1'

Still no luck, I get same error :(

Hrvoje
  • 13,566
  • 7
  • 90
  • 104

1 Answers1

9

Spark documentation is not really helpful - it says ... artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...

Yes, that is correct... but you're looking at documentation for the latest version of Spark

Instead, you've mentioned

versions:

  • spark 3.1.2

Have you tried looking at the version specific docs?

In other words, you want the matching spark-sql-kafka version of 3.1.2.

bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

Or in Python,

scala_version = '2.12'
spark_version = '3.1.2'
# TODO: Ensure match above values match the correct versions
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.1'
]
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()

Or with an env-var

import os

spark_version = '3.1.2'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)

# init spark here

need to add this above library and its dependencies

As you found in my previous answer, also append the kafka-clients package using comma-separated list.

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1


I'm developing app, I don't want to deploy it.

"Deploy" is Spark terminology. Running locally is still a "deployment"

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Thank you so much for this! One thing is still not clear to me - how do I 'append the kafka-clients package' ? Where is that package? – Hrvoje Dec 16 '21 at 16:45
  • Also https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html says: "Do not manually add dependencies on org.apache.kafka artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has the appropriate transitive dependencies already, and different versions may be incompatible in hard to diagnose ways." – Hrvoje Dec 16 '21 at 16:48
  • 1
    Last I checked, PySpark doesn't pull the transitive dependencies (thus, the reported error in the answer you linked), only when using a tool like SBT / Maven / Gradle, it does. As for "where" - Maven Central. You "append" a string to the packages argument, as shown – OneCricketeer Dec 16 '21 at 20:44
  • I've tried as you suggested - see edit on my question. It's still not working :( – Hrvoje Dec 17 '21 at 05:15
  • You put the `{}` formatter at the wrong spot. It only goes to the Spark dependencies, not at the very end – OneCricketeer Dec 17 '21 at 14:55
  • I've edited answer - not working again. Problem is that my Kafka is for Scala version 2.13 and Spark requires 2.12. I'll reinstall kafka and try again... – Hrvoje Dec 18 '21 at 08:38
  • I'm using latest Spark 3.2.0 and kafka_2.12-2.8.1 but still no luck. – Hrvoje Dec 18 '21 at 08:47
  • The scala api version for the Kafka broker shouldn't matter. As long as your Spark version and **Spark's** scala version are the same as the packages argument, then it'll work. Where are you putting `os.environ` call in relation to the Spark code? Are you importing `findspark`, perhaps? – OneCricketeer Dec 18 '21 at 15:01
  • 1
    @user12 It's correct, and been answered [other times](https://stackoverflow.com/questions/58723314/pyspark-failed-to-find-data-source-kafka/58723724#58723724) too – OneCricketeer Dec 21 '21 at 02:14