32

I have a Spark Streaming job which has been running continuously. How do I stop the job gracefully? I have read the usual recommendations of attaching a shutdown hook in the job monitoring and sending a SIGTERM to the job.

sys.ShutdownHookThread {
  logger.info("Gracefully stopping Application...")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  logger.info("Application stopped gracefully")
}

It seems to work but does not look like the cleanest way to stop the job. Am I missing something here?

From a code perspective it may make sense but how do you use this in a cluster environment? If we start a spark streaming job (we distribute the jobs on all the nodes in the cluster) we will have to keep track of the PID for the job and the node on which it was running. Finally when we have to stop the process, we need to keep track which node the job was running at and the PID for that. I was just hoping that there would be a simpler way of job control for streaming jobs.

Saket
  • 3,079
  • 3
  • 29
  • 48

4 Answers4

24

You can stop your streaming context in cluster mode by running the following command without needing to sending a SIGTERM. This will stop the streaming context without you needing to explicitly stop it using a thread hook.

$SPARK_HOME_DIR/bin/spark-submit --master $MASTER_REST_URL --kill $DRIVER_ID

-$MASTER_REST_URL is the rest url of the spark driver, ie something like spark://localhost:6066

-$DRIVER_ID is something like driver-20150915145601-0000

If you want spark to stop your app gracefully, you can try setting the following system property when your spark app is initially submitted (see http://spark.apache.org/docs/latest/submitting-applications.html on setting spark configuration properties).

spark.streaming.stopGracefullyOnShutdown=true

This is not officially documented, and I gathered this from looking at the 1.4 source code. This flag is honored in standalone mode. I haven't tested it in clustered mode yet.

I am working with spark 1.4.*

ud3sh
  • 1,249
  • 10
  • 13
  • Hi, ud3sh, in yarn cluster model and 1.3 version, how can i do? – zwb Apr 14 '16 at 22:27
  • 4
    'spark.streaming.stopGracefullyOnShutdown' its now officially documented in spark configuration page http://spark.apache.org/docs/latest/configuration.html#spark-streaming – avr May 11 '16 at 10:16
  • What if you instantiated multiple jobs per context and want to stop jobs individually? – Ömer Faruk Almalı Aug 16 '16 at 08:43
  • 1
    How do you get the DRIVER_ID? I don't find anything like that in the logs, or in the YARN UI. – Shikkou May 16 '19 at 14:28
4

Depends on the use case and how driver can be used.

Consider the case you wanted to collect some N records(tweets) from the Spark Structured Streaming, store them in Postgresql and stop the stream once the count crosses N records.

One way of doing this is to use accumulator and python threading.

  • Create a Python thread with stream query object and the accumulator, stop the query once the count is crossed
  • While starting the stream query pass the accumulator variable and update the value for each batch of the stream.

Sharing the code snippet for understanding/illustration purpose...

import threading
import time


def check_n_stop_streaming(query, acc, num_records=3500):
    while (True):
        if acc.value > num_records:
            print_info(f"Number of records received so far {acc.value}")
            query.stop()
            break
        else:
            print_info(f"Number of records received so far {acc.value}")
        time.sleep(1)
...

count_acc = spark.sparkContext.accumulator(0)

...

def postgresql_all_tweets_data_dump(df,
                                    epoch_id,
                                    raw_tweet_table_name,
                                    count_acc):

    print_info("Raw  Tweets...")
    df.select(["text"]).show(50, False)
    count_acc += df.count()

    mode = "append"
    url = "jdbc:postgresql://{}:{}/{}".format(self._postgresql_host,
                                              self._postgresql_port,
                                              self._postgresql_database)
    properties = {"user": self._postgresql_user,
                  "password": self._postgresql_password,
                  "driver": "org.postgresql.Driver"}
    df.write.jdbc(url=url, table=raw_tweet_table_name, mode=mode, properties=properties)

...

query = tweet_stream.writeStream.outputMode("append"). \
    foreachBatch(lambda df, id :
                 postgresql_all_tweets_data_dump(df=df,
                                                 epoch_id=id,
                                                 raw_tweet_table_name=raw_tweet_table_name,
                                                 count_acc=count_acc)).start()





stop_thread = threading.Thread(target=self.check_n_stop_streaming, args=(query, num_records, raw_tweet_table_name, ))
stop_thread.setDaemon(True)
stop_thread.start()

query.awaitTermination()
stop_thread.join()

Mageswaran
  • 440
  • 3
  • 6
-2

If all you need is just stop running streaming application, then simplest way is via Spark admin UI (you can find it's URL in the startup logs of Spark master).

There is a section in the UI, that shows running streaming applications, and there are tiny (kill) url buttons near each application ID.

Vladimir Kroz
  • 5,237
  • 6
  • 39
  • 50
  • Where? I only see little `(kill)` buttons next to each *stage* (on a single receiver), but if I kill one of them another one is quickly spawned – Felipe Dec 30 '16 at 07:31
-4

It is official now,please look into original apache documentation here- http://spark.apache.org/docs/latest/configuration.html#spark-streaming

Dave
  • 129
  • 1
  • 1
  • 5
  • 6
    Please share specific content from the link related to the answer. Merely posting a link doesn't solve OP's problem. – Pirate X Jun 14 '16 at 10:26