We are facing this issue at the moment and all the displayed "Similar questions" did not help to solve our problem. We are new to docker and also to spark.
We used the following Docker Compose to setup our containers:
networks:
spark_net:
volumes:
shared-workspace:
name: "hadoop-distributed-file-system"
driver: local
services:
jupyterlab:
image: jupyterlab
container_name: jupyterlab
ports:
- 8888:8888
volumes:
- shared-workspace:/opt/workspace
spark-master:
image: spark-master
networks:
- spark_net
container_name: spark-master
ports:
- 8080:8080
- 7077:7077
volumes:
- shared-workspace:/opt/workspace
spark-worker-1:
image: spark-worker
networks:
- spark_net
container_name: spark-worker-1
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8081:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
spark-worker-2:
image: spark-worker
networks:
- spark_net
container_name: spark-worker-2
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8082:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "7575"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- ./var/run/docker.sock
We also created two pythonm files to test if kafka streaming works:
producer
import json
import time
producer = KafkaProducer(bootstrap_servers = ['twitter-streaming_kafka_1:9093'],
api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
producer.send('corona', value=data)
time.sleep(0.5)
Consumer:
import time
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime
import json
print('starting consumer')
consumer = KafkaConsumer(
'corona',
bootstrap_servers=['twitter-streaming_kafka_1:9093'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
print('printing messages')
for message in consumer:
message = message.value
print(message)
When we executed both scripts in different CLIs in our jupyterlab container and it worked. When we want to connect to our producer stream via pyspark with the following code we get the mentioned error.
import random
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession
spark = Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("subscribe", "corona").load()
We also executed the following command in the spark-master CLI:
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...
stacktrace
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-2-4dba09a73304> in <module>
6
7 spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
----> 8 df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "twitter-streaming_kafka_1:9093").option("subscribe", "corona").load()
/usr/local/lib/python3.7/dist-packages/pyspark/sql/streaming.py in load(self, path, format, schema, **options)
418 return self._df(self._jreader.load(path))
419 else:
--> 420 return self._df(self._jreader.load())
421
422 @since(2.0)
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;