3

I am following along with the code in Apache Spark Definitive Guide. I ran into an issue where the following code does not print result in the Jupyter Notebook when I have the commented line of code, "awaitTermination()". With "awaitTermination()" included in code the Jupyter Kernel is busy and it stays busy for a long time possibly indefinitely.

Without "awaitTermination" the code works fine.

Can someone explain this behavior. How I could overcome this?

static = spark.read.json(r"/resources/activity-data/")
dataSchema = static.schema
streaming = (spark
             .readStream
             .schema(dataSchema)
             .option("maxFilesPerTrigger", 1)
             .json(r"/resources/activity-data/")
            )
activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
activityQuery = (activityCounts
                 .writeStream
                 .queryName("activity_counts")
                 .format("memory")
                 .outputMode("complete")
                 .start()
                )
#activityQuery.awaitTermination()
#activityQuery.stop()
from time import sleep
for x in range(5):
    spark.table("activity_counts").show()
    sleep(1)

1 Answers1

5

Yes; please see this documentation as a reference (https://docs.databricks.com/spark/latest/structured-streaming/production.html) and page 352 in Spark TDG explains it as well.

Spark Streaming jobs are continuous applications and in production activityQuery.awaitTermination() is required because it prevents the driver process from terminating when the stream is active (in the background).

If the driver is killed, then the application is therefore killed too, hence activityQuery.awaitTermination() is sort of like a fail-safe. If you want to turn off the stream in Jupyter you can run activityQuery.stop() to reset a query for testing purposes ... I hope this helps.

activityDataSample = 'path/to/data'
spark.conf.set("spark.sql.shuffle.partitions", 8)
static = spark.read.json(activityDataSample)
dataSchema = static.schema
static.printSchema()

streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.json(activityDataSample)

activityCounts = streaming.groupBy("gt").count()

activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("memory").outputMode("complete")\
.start()

# simulates a continuous stream for testing (cntrl-C to kill app)
'''
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("console").outputMode("complete")\
.start()
activityQuery.awaitTermination()
'''

spark.streams.active # query stream is active
[<pyspark.sql.streaming.StreamingQuery at 0x28a4308d320>]

from time import sleep
for x in range(3):
    spark.sql("select * from activity_counts").show(3)
    sleep(2)
+---+-----+
| gt|count|
+---+-----+
+---+-----+

+--------+-----+
|      gt|count|
+--------+-----+
|    bike|10796|
|    null|10449|
|stairsup|10452|
+--------+-----+
only showing top 3 rows

+--------+-----+
|      gt|count|
+--------+-----+
|    bike|10796|
|    null|10449|
|stairsup|10452|
+--------+-----+
only showing top 3 rows

activityQuery.stop() # stop query stream
spark.streams.active # no active streams anymore
[]
thePurplePython
  • 2,621
  • 1
  • 13
  • 34
  • Added activityQuery.stop(), right after the activityQuery.awaitTermination() but the kernel stays busy for a long time. Only way I can explore the data is after interrupting the kernel. – Keerikkattu Chellappan Apr 25 '19 at 12:26
  • 1
    So you are using "memory" mode which is only meant for debugging/testing just like "console" mode. You would never use this in a production environment. If you switch to "console" mode you can view the mini-batch results in your terminal for testing purposes. Your code is fine for learning purposes and I added some code to my response to confirm. Does this answer your question? – thePurplePython Apr 25 '19 at 14:28
  • So what is the summary? With awaitTermination() the dataframe.show() will not print dataframe in Jupyter notebook. – Keerikkattu Chellappan Apr 25 '19 at 16:33
  • 1
    Are you running this on a laptop? If so then you aren't scaling anything ... it is just running in memory on your machine. The summary is spark streaming jobs are designed to be continuous applications and awaitTermination() contributes to that. I am not an expert but remember i thought this topic was confusing too - this doc helps https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html - The only notebook I know that provides solid integration with Spark Streaming applications is Databricks. Jupyter is not meant to scale/stream continuous distributed apps. – thePurplePython Apr 25 '19 at 17:07
  • 1
    this post might help => https://stackoverflow.com/questions/47357418/how-to-get-the-output-from-console-streaming-sink-in-zeppelin – thePurplePython Apr 25 '19 at 17:11
  • I am trying out spark streaming in IBM Jupyter Labs. I have a work coming up which is to analyse a Web log that is being streamed in a Kafka topic. So my source would be Kafka. I need to analyse the data every hour and send out an alert if issues are found. Key is that I only need to process one hours worth of data at the end of that hour. In Next hour I process only that hours data. What is the most efficient way to do this. Is my problem best solved in Spark batch or Spark stream. I don't think I need to write the data to any sink. – Keerikkattu Chellappan Apr 26 '19 at 18:19