19

I have a spark streaming job that read from Kafka every 5 seconds, does some transformation on incoming data, and then writes to the file system.

This doesn't really need to be a streaming job, and really, I just want to run it once a day to drain the messages onto the file system. I'm not sure how to stop the job though.

If I pass a timeout to the streamingContext.awaitTermination, it doesn't stop the process, all it does is cause the process to spawn errors when it comes time to iterate on the stream (see error below)

What is the best way to accomplish what I'm trying to do

this is for Spark 1.6 on Python

EDIT:

thanks to @marios the solution was this:

ssc.start()
ssc.awaitTermination(10)
ssc.stop()

that runs the script for ten seconds before stopping.

simplified code:

conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
    ssc,
    kafkaParams["zookeeper.connect"],
    "vehicle-data-importer",
    topicPartitions,
    kafkaParams)

stream.saveAsTextFiles('stream-output/kafka-vehicle-data')

ssc.start()
ssc.awaitTermination(10)

error:

16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms
py4j.Py4JException: Cannot obtain a new communication channel
    at py4j.CallbackClient.sendCommand(CallbackClient.java:232)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
    at com.sun.proxy.$Proxy14.call(Unknown Source)
    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
    at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)
lostinplace
  • 1,538
  • 3
  • 14
  • 38
  • How could you know the data source is out? By the way, you can call `ssc.stop()` after `ssc.awaitTermination` to stop the Streaming application. – zsxwing Jan 29 '16 at 22:25
  • IMHO, if you just need to read the data once in a day then create a Spark Batch job to read and process the data and further use some scheduler like cron or Quartz to schedule your job. – Sumit Jan 31 '16 at 02:39
  • The problem with executing a single batch (using createRDD) is that there is no easy way to track the offsets within zookeeper. THat's one of the things I'd like to achieve here – lostinplace Feb 01 '16 at 14:23
  • I'd considered calling `ssc.stop` but I was having trouble figuring out how to call it asynchronously – lostinplace Feb 01 '16 at 14:24

3 Answers3

11

It seems that the right method to call is awaitTerminationOrTimeout(self, timeout).

I am not sure if it also stops the streaming context. So maybe you can call a ssc.stop() right after the timeout ends.

ssc.start()
ssc.awaitTerminationOrTimeout(10)
ssc.stop()

Note: Take a look here for a similar question.

Community
  • 1
  • 1
marios
  • 8,874
  • 3
  • 38
  • 62
1

Have a try Kafka "consumer.timeout.ms" parameter, which will gracefully end KafkaReceiver.(from kafka 0.8 configuration)

Throw a timeout exception to the consumer if no message is available for consumption after the specified interval

HDF = KafkaUtils.createStream(ssc, topics={strLoc : 1}, kafkaParams={"consumer.timeout.ms":"20000" }, zkQuorum='xxx:2181', groupId='xxx-consumer-group')

You will not able to receive any new kafka messages in current streaming execution and always get empty RDDs.
And check the count of empty RDDs in DSteam.foreachRDD(func). Terminate streaming exectuion if you continurously get empty RDDs.

Shawn Guo
  • 3,169
  • 3
  • 21
  • 28
  • tried this: `kafkaParams = { "metadata.broker.list": str.join(',',brokers), "zookeeper.connect": str.join(',',zkNodes), "consumer.id": "vehicle-data-importer", "group.id":"importers", "auto.offset.reset": "smallest", "consumer.timeout.ms": "10000" }` there was no effect – lostinplace Feb 01 '16 at 14:27
  • "consumer.timeout.ms" will not stop streaming execution but will terminate the kafka receiver to wait for more messages even new message arrived later. you can simply use "ssc.awaitTerminationOrTimeout(10)" but it is not safe. – Shawn Guo Feb 02 '16 at 03:59
0

The issue here spark 1.6 onwards the ssc.stop call in the same thread as the Dstream processing thread would create a deadlock as stop would wait for poller thread to complete creating deadlock.sp the call for stop from another thread

Ajith Kannan
  • 812
  • 1
  • 8
  • 30