I want to read the data sent by the kafka producer, but I encountered the following problem:
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
Then, according to the error message, I tried to search the official documentation and this website,Then I found something error like me:link1 However, through these methods I found that I still can't solve this problem, so want to ask if there is any better way to help me solve it Attached below is my error code and version information My error code:
from kafka import KafkaProducer
from pyspark.python.pyspark.shell import spark
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
import json
import sys
import os
import findspark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1'
findspark.init()
def ReadingDataToKafka():
spark_conf = SparkConf().setAppName("KafkaWordCount")
sc = SparkContext.getOrCreate(conf=spark_conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("file:///tmp/ZHYCargeProject")
topics = 'sex'
topicAry = topics.split(",")
topicMap = {}
for topic in topicAry:
topicMap[topic] = 1
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "bigdataweb01:9092") \
.option("subscribe", "sex") \
.load()
The error message is as follows:
Traceback (most recent call last):
File "/tmp/ZHYCargeProject/demo3/kafka_text.py", line 94, in <module>
ReadingDataToKafka()
File "/tmp/ZHYCargeProject/demo3/kafka_text.py", line 23, in ReadingDataToKafka
df = spark.readStream \
File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/sql/streaming.py", line 482, in load
return self._df(self._jreader.load())
File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/py4j/java_gateway.py", line 1309, in __call__
return_value = get_return_value(
File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
Related version information:
python== 3.8.13
java==1.8.0_312
Spark==3.2.1
kafka==2.12-3.20
scala==2.12.15
kafka-python==2.0.2
pyspark==3.1.2