1

Im using the below custom receiver to consume data from Rabbitmq in Spark-Scala. Below is my code.

def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
   def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
     
     var batchInterval = Seconds(20)
              var ssc = new StreamingContext(sc, batchInterval)
      val host = ""
                val port = ""
                val queueName = ""
                val vHost = ""
                val userName = ""
                val password = ""
val maxMessagesPerPartition = "1000"
                val maxReceiveTime = "0.9"
      
        val receiverStream = RabbitMQUtils.createStream(ssc, Map(
      "host" -> "host",
      "port" -> "port",
      "queueName" -> "queueName",
      "vHost" -> "vHost",
      "userName" -> "userName",
      "password" -> "password", 
"maxMessagesPerPartition" -> "maxMessagesPerPartition",
 "maxReceiveTime"   -> "maxReceiveTime"
          ))
     
       val lines = ssc.receiverStream(new CustomReceiver(host, port.toInt))
    lines.foreachRDD(rdd =>{ val df=rdd.toDF
                        
 
  import sqlContext.implicits._
 
  df.write.format("parquet").mode("append").save("path")
  
})

      lines.print()
      ssc.start() 
      ssc.awaitTermination()

Im getting the below time out error.

java.util.concurrent.TimeoutException
at org.apache.spark.util.ThreadUtils$.runInNewThreadWithTimeout(ThreadUtils.scala:351)
    at org.apache.spark.util.ThreadUtils$.runInNewThread(ThreadUtils.scala:283)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:577)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:87)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:188)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:190)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw.<init>(command-1212830188116081:192)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw.<init>(command-1212830188116081:194)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw.<init>(command-1212830188116081:196)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read.<init>(command-1212830188116081:198)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<init>(command-1212830188116081:202)
    at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<clinit>(command-1212830188116081)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print$lzycompute(<notebook>:7)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print(<notebook>:6)
    at lined28d5369d60244b0a66d1d87a30c93a027.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
     
    

Does this error means that the current spark cluster configuration is not able to handle the incoming message load. Is it related to any memory issue.Could someone please assist.

SanjanaSanju
  • 261
  • 2
  • 18

1 Answers1

1

I would suggest to Increase spark.driver.memory to higher value.

Also try to increase the broadcastTimeout.

Refer this answer by T. Gawęda

Sysanin
  • 1,501
  • 20
  • 27
Abhishek K
  • 3,047
  • 1
  • 6
  • 19