0

I want to read the data sent by the kafka producer, but I encountered the following problem:

pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

Then, according to the error message, I tried to search the official documentation and this website,Then I found something error like me:link1 However, through these methods I found that I still can't solve this problem, so want to ask if there is any better way to help me solve it Attached below is my error code and version information My error code:

from kafka import KafkaProducer
from pyspark.python.pyspark.shell import spark
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
import json
import sys
import os
import findspark

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1'
findspark.init()
def ReadingDataToKafka():
    spark_conf = SparkConf().setAppName("KafkaWordCount")
    sc = SparkContext.getOrCreate(conf=spark_conf)
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///tmp/ZHYCargeProject")
    topics = 'sex'
    topicAry = topics.split(",")
    topicMap = {}
    for topic in topicAry:
        topicMap[topic] = 1
        df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "bigdataweb01:9092") \
        .option("subscribe", "sex") \
        .load()

The error message is as follows:

Traceback (most recent call last):
  File "/tmp/ZHYCargeProject/demo3/kafka_text.py", line 94, in <module>
    ReadingDataToKafka()
  File "/tmp/ZHYCargeProject/demo3/kafka_text.py", line 23, in ReadingDataToKafka
    df = spark.readStream \
  File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/sql/streaming.py", line 482, in load
    return self._df(self._jreader.load())
  File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/py4j/java_gateway.py", line 1309, in __call__
    return_value = get_return_value(
  File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

Related version information:

  • python== 3.8.13

  • java==1.8.0_312

  • Spark==3.2.1

  • kafka==2.12-3.20

  • scala==2.12.15

  • kafka-python==2.0.2

  • pyspark==3.1.2

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
ZHYCarge
  • 1
  • 1
  • 1
    Try with SparkSession `spark = SparkSession.builder.appName("yourApp").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2")` – Rm4n Jun 30 '22 at 09:14
  • 1) You should uninstall `kafka-python` since it isn't used. Spark has its own Kafka producer methods 2) Are you running the code with `spark-submit` or `pyspark` command? 3) I updated the linked post, you might want to check again... Also, how do you have `pyspark==3.1.2` , but Spark `3.2.1`?? – OneCricketeer Jun 30 '22 at 19:06
  • Thank you for helping me solve the problem~ First, I tried using ` spark =SparkSession.builder.appName("yourApp").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") ` This method, and a small refactoring of my code with reference to [this article](https://spark.apache.org/docs/3.2.1/structured-streaming-programming-guide.html) then my `pyspark` version refers to pypi and I use ` Pycharm ` to write and run the code – ZHYCarge Jul 01 '22 at 02:30
  • Pycharm doesn't matter unless you have it using a virtualenv that has dependencies separate from those you listed. So, it works now? Or not? – OneCricketeer Jul 01 '22 at 09:25

0 Answers0