1

We are facing this issue at the moment and all the displayed "Similar questions" did not help to solve our problem. We are new to docker and also to spark.

We used the following Docker Compose to setup our containers:


networks:
  spark_net:

volumes:
  shared-workspace:
    name: "hadoop-distributed-file-system"
    driver: local
services:
  jupyterlab:
    image: jupyterlab
    container_name: jupyterlab
    ports:
      - 8888:8888
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: spark-master
    networks:
      - spark_net
    container_name: spark-master
    ports:
      - 8080:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: spark-worker
    networks:
      - spark_net
    container_name: spark-worker-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8081:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: spark-worker
    networks:
      - spark_net
    container_name: spark-worker-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
      
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
      
  kafka:
    image: wurstmeister/kafka
    ports:
      - "7575"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - ./var/run/docker.sock

We also created two pythonm files to test if kafka streaming works:

producer

import json
import time

producer = KafkaProducer(bootstrap_servers = ['twitter-streaming_kafka_1:9093'],
                         api_version=(0,11,5),
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for e in range(1000):
    data = {'number' : e}
    producer.send('corona', value=data)
    time.sleep(0.5)

Consumer:

import time
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime
import json

print('starting consumer')
consumer = KafkaConsumer(
    'corona',
     bootstrap_servers=['twitter-streaming_kafka_1:9093'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

print('printing messages')
for message in consumer:
    message = message.value
    print(message)

When we executed both scripts in different CLIs in our jupyterlab container and it worked. When we want to connect to our producer stream via pyspark with the following code we get the mentioned error.

import random
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession
        
spark = Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("subscribe", "corona").load()

We also executed the following command in the spark-master CLI:

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...

stacktrace

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-2-4dba09a73304> in <module>
      6 
      7 spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
----> 8 df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "twitter-streaming_kafka_1:9093").option("subscribe", "corona").load()

/usr/local/lib/python3.7/dist-packages/pyspark/sql/streaming.py in load(self, path, format, schema, **options)
    418             return self._df(self._jreader.load(path))
    419         else:
--> 420             return self._df(self._jreader.load())
    421 
    422     @since(2.0)

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

1 Answers1

0

Your Kafka container needs to be placed on the spark_net network in order for Spark containers to resolve it by name

Same with Jupyter if you want it to be able to launch jobs on the Spark cluster

Also, you need to add the Kafka package

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • We tried that also and its still not working. Some additional ideas? Regards Chris – Christian_22113 Nov 12 '20 at 21:26
  • It would help if you provided the complete stacktrace. But, 1) Your PySpark code isn't loading any Kafka packages, and they are not available by default 2) `KAFKA_ADVERTISED_HOST_NAME` needs to be the name of the Kafka container, not 127.0.0.1, or removed entirely because you have advertised listeners – OneCricketeer Nov 13 '20 at 17:48
  • added the complete stacktrace at the end of our question @OneCricketeer – Christian_22113 Nov 16 '20 at 15:57
  • Yes, "Failed to find data source" means you have not added the equivalent `--packages` for your PySpark setup like you did in spark-shell – OneCricketeer Nov 16 '20 at 17:11