1

I get the following error while running my spark streaming application, we have a large application running multiple stateful (with mapWithState) and stateless operations. It's getting difficult to isolate the error since spark itself hangs and the only error we see is in the spark log and not the application log itself.

The error happens only after abount 4-5 mins with a micro-batch interval of 10 seconds. I am using Spark 1.6.1 on an ubuntu server with Kafka based input and output streams.

Please note it's not possible for me to provide the smallest possible code to re-create this bug as it does not occur in unit test-cases, and the application itself is very large

Any direction you can give to solve this issue will be helpful. Please let me know if I can provide any more information.

Error inline below:

[2017-07-11 16:15:15,338] ERROR Error cleaning broadcast 2211 (org.apache.spark.ContextCleaner)

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout

        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)

        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)

        at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)

        at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)

        at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)

        at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:77)

        at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233)

        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189)

        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180)

        at scala.Option.foreach(Option.scala:236)

        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180)

        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)

        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)

        at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68)

    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:107)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
tsar2512
  • 2,826
  • 3
  • 33
  • 61
  • pls. check my [answer](https://stackoverflow.com/a/40722515/647053) seems like your time out using default 120sec. – Ram Ghadiyaram Jul 18 '17 at 18:52
  • @RamGhadiyaram please look at the first error it seems to report error cleaning broadcast... are u saying that itself is happening because of timeout? The workload as such is not changing... its a kafka input stream with a constant input rate... – tsar2512 Jul 18 '17 at 21:11
  • Yes. see [ContextCleaner.scala](https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/ContextCleaner.scala)->`doCleanupBroadcast` method error message says that its timeout error due to clean up task. pls increase the timeout period. it should help you. – Ram Ghadiyaram Jul 19 '17 at 18:21
  • was it helpful? were you able to resolve? – Ram Ghadiyaram Aug 02 '17 at 05:22
  • Hi @RamGhadiyaram yes it was helpful – tsar2512 Aug 02 '17 at 13:40

1 Answers1

3

Your exception message clearly says that its RPCTimeout due to default configuration of 120 seconds and adjust to optimal value as per your work load. please see 1.6 configuration

your error messages org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. and at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) confirms that.


For Better understanding please see the below code from

see RpcTimeout.scala

     /**
   * Wait for the completed result and return it. If the result is not available within this
   * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
   * @param  awaitable  the `Awaitable` to be awaited
   * @throws RpcTimeoutException if after waiting for the specified time `awaitable`
   *         is still not ready
   */
  def awaitResult[T](awaitable: Awaitable[T]): T = {
    try {
      Await.result(awaitable, duration)
    } catch addMessageIfTimeout
  }
}
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121