0

I'm putting together a prototype of a streaming data ingestion from MySQL through Debezium through Kafka and onto Spark using Docker.

My docker-compose.yml file links this way:

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
  spark-master:
      image: docker.io/bitnami/spark:3.3
      environment:
        - SPARK_MODE=master
        - SPARK_RPC_AUTHENTICATION_ENABLED=no
        - SPARK_RPC_ENCRYPTION_ENABLED=no
        - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
        - SPARK_SSL_ENABLED=no
      ports:
        - '8080:8080'
  spark-worker:
    image: docker.io/bitnami/spark:3.3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=2G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    links:
      - kafka
  jupyter:
    image: jupyter/pyspark-notebook
    environment:
      - GRANT_SUDO=yes
      - JUPYTER_ENABLE_LAB=yes
      - JUPYTER_TOKEN=mysecret
    ports:
      - "8888:8888"
    volumes:
      - /Users/eugenegoldberg/jupyter_notebooks:/home/eugene
    depends_on:
      - spark-master  

My Jupyter notebook (also served by the same docker-compose) has the following PySpark code, which attempt to connect Spark to Kafka:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import os

spark_version = '3.3.1'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)

packages = [
    f'org.apache.kafka:kafka-clients:3.3.1'
]

# Create SparkSession
spark = SparkSession.builder \
    .appName("Kafka Streaming Example") \
    .config("spark.driver.host", "host.docker.internal") \
    .config("spark.jars.packages", ",".join(packages)) \
    .getOrCreate()

# Define the Kafka topic and Kafka server/port
topic = "dbserver1.inventory.customers"
kafkaServer = "kafka:9092" # assuming kafka is running on a container named 'kafka'

# Read data from kafka topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafkaServer) \
  .option("subscribe", topic) \
  .load()

I'm getting this error:

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[9], line 32
     24 kafkaServer = "kafka:9092" # assuming kafka is running on a container named 'kafka'
     26 # Read data from kafka topic
     27 df = spark \
     28   .readStream \
     29   .format("kafka") \
     30   .option("kafka.bootstrap.servers", kafkaServer) \
     31   .option("subscribe", topic) \
---> 32   .load()

File /usr/local/spark/python/pyspark/sql/streaming.py:469, in DataStreamReader.load(self, path, format, schema, **options)
    467     return self._df(self._jreader.load(path))
    468 else:
--> 469     return self._df(self._jreader.load())

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /usr/local/spark/python/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
    192 converted = convert_exception(e.java_exception)
    193 if not isinstance(converted, UnknownException):
    194     # Hide where the exception came from that shows a non-Pythonic
    195     # JVM exception message.
--> 196     raise converted from None
    197 else:
    198     raise

AnalysisException:  Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".    

What do I need to change in order to fix this?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Eugene Goldberg
  • 14,286
  • 20
  • 94
  • 167
  • You dont need both `spark.jars.packages` and `PYSPARK_SUBMIT_ARGS = --packages...`. How did you verify the container is using Spark 3.3.1? – OneCricketeer Jan 24 '23 at 17:37

1 Answers1

0

tl;dr Neither spark-master nor spark-worker have the necessary library on their CLASSPATHs and hence the infamous AnalysisException: Failed to find data source: kafka.

See Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)? for some background.

My guess is that you should add the jar to jupyter (so when it executes Spark code it knows where to find the required classes). See https://stackoverflow.com/a/36295208/1305344 for a solution.

There is also Jupyter Docker Stacks build manifests that comes directly from Jupyter project.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • "should add the jar to jupyter" ; isn't that what `spark.jars.packages` does? Uses Livy to download JARs? – OneCricketeer Jan 24 '23 at 17:38
  • It has worked for me using local master - https://github.com/OneCricketeer/docker-stacks/blob/master/hadoop-spark/spark-notebooks/kafka-sql.ipynb – OneCricketeer Jan 24 '23 at 17:39