3

I want to create a structured stream in databricks with a kafka source. I followed the instructions as described here. My script seems to start, however it fails with the first element of the stream. The stream itsellf works fine and produces results and works (in databricks) when I use confluent_kafka, thus there seems to be a different issue I am missing:

After the initial stream is processed, the script times out:

java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 80afdeed-9266-4db4-85fa-66ccf261aee4, 
runId = b564c626-9c74-42a8-8066-f1f16c7ab53d] failed to stop within 36000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.`

WHAT I TRIED: looking at SO and finding this answer, to which I included spark.conf.set("spark.sql.streaming.stopTimeout", 36000) into my setup - which changed nothing.

Any input is highly appreciated!

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Define a data schema
schema = StructType() \
           .add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())\
           .add('ID', StringType())\
           .add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())\
           .add('TIMESTAMP', TimestampType())


df = spark \
    .readStream \
    .format("kafka") \
    .option("host", "stream.xxx.com") \
    .option("port", 12345)\
    .option('kafka.bootstrap.servers', 'stream.xxx.com:12345') \
    .option('subscribe', 'stream_test.json') \
    .option("startingOffset", "earliest") \
    .load()

df_word = df.select(F.col('key').cast('string'),
                    F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))
  

df_word \
      .writeStream \
      .format("parquet") \
      .option("path", "dbfs:/mnt/streamfolder/stream/") \
      .option("checkpointLocation", "dbfs:/mnt/streamfolder/check/") \
      .outputMode("append") \
      .start()

my stream output data looks like this:

"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"

Furthermore, stream and check folders are filled with 0-b files, except for metadata, which includes the ìd from the error above.

Thanks and stay safe.

heck1
  • 714
  • 5
  • 20
  • how much data is in the Kafka topic? did you also call `awaitTermination` at the end of your code? (it is not visible in your question) – Michael Heil Oct 23 '20 at 10:28
  • hi @mike - as I showed, there are 4 columns with some strings in the kafka topic. I tried `awaitTermination` - no change, therefore I omitted it. Thank you :) – heck1 Oct 23 '20 at 10:38

1 Answers1

0

I had the same problem. I checked the driver logs and discovered this exception in the stacktrace:

org.apache.spark.SparkException: Failed to store executor broadcast spark_join_relation_3540_1455983219 (size = Some(67371008)) in BlockManager with storageLevel=StorageLevel(memory, deserialized, 1 replicas)

Based on this recommendation I raised Driver memory (16gb to 32gb in my case) and it solved the issue.

This answer on StackOverflow explains why it works.

starball
  • 20,030
  • 7
  • 43
  • 238
jd2050
  • 182
  • 1
  • 2
  • 16