I want to create a structured stream in databricks with a kafka source.
I followed the instructions as described here. My script seems to start, however it fails with the first element of the stream. The stream itsellf works fine and produces results and works (in databricks) when I use confluent_kafka
, thus there seems to be a different issue I am missing:
After the initial stream is processed, the script times out:
java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 80afdeed-9266-4db4-85fa-66ccf261aee4,
runId = b564c626-9c74-42a8-8066-f1f16c7ab53d] failed to stop within 36000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.`
WHAT I TRIED: looking at SO and finding this answer, to which I included
spark.conf.set("spark.sql.streaming.stopTimeout", 36000)
into my setup - which changed nothing.
Any input is highly appreciated!
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Define a data schema
schema = StructType() \
.add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())\
.add('ID', StringType())\
.add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())\
.add('TIMESTAMP', TimestampType())
df = spark \
.readStream \
.format("kafka") \
.option("host", "stream.xxx.com") \
.option("port", 12345)\
.option('kafka.bootstrap.servers', 'stream.xxx.com:12345') \
.option('subscribe', 'stream_test.json') \
.option("startingOffset", "earliest") \
.load()
df_word = df.select(F.col('key').cast('string'),
F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))
df_word \
.writeStream \
.format("parquet") \
.option("path", "dbfs:/mnt/streamfolder/stream/") \
.option("checkpointLocation", "dbfs:/mnt/streamfolder/check/") \
.outputMode("append") \
.start()
my stream output data looks like this:
"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"
Furthermore, stream
and check
folders are filled with 0-b files, except for metadata
, which includes the ìd from the error above.
Thanks and stay safe.