Im using the below custom receiver to consume data from Rabbitmq in Spark-Scala. Below is my code.
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
var userInput: String = null
try {
var batchInterval = Seconds(20)
var ssc = new StreamingContext(sc, batchInterval)
val host = ""
val port = ""
val queueName = ""
val vHost = ""
val userName = ""
val password = ""
val maxMessagesPerPartition = "1000"
val maxReceiveTime = "0.9"
val receiverStream = RabbitMQUtils.createStream(ssc, Map(
"host" -> "host",
"port" -> "port",
"queueName" -> "queueName",
"vHost" -> "vHost",
"userName" -> "userName",
"password" -> "password",
"maxMessagesPerPartition" -> "maxMessagesPerPartition",
"maxReceiveTime" -> "maxReceiveTime"
))
val lines = ssc.receiverStream(new CustomReceiver(host, port.toInt))
lines.foreachRDD(rdd =>{ val df=rdd.toDF
import sqlContext.implicits._
df.write.format("parquet").mode("append").save("path")
})
lines.print()
ssc.start()
ssc.awaitTermination()
Im getting the below time out error.
java.util.concurrent.TimeoutException
at org.apache.spark.util.ThreadUtils$.runInNewThreadWithTimeout(ThreadUtils.scala:351)
at org.apache.spark.util.ThreadUtils$.runInNewThread(ThreadUtils.scala:283)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:577)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:87)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:188)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:190)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw.<init>(command-1212830188116081:192)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw.<init>(command-1212830188116081:194)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw.<init>(command-1212830188116081:196)
at lined28d5369d60244b0a66d1d87a30c93a027.$read.<init>(command-1212830188116081:198)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<init>(command-1212830188116081:202)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<clinit>(command-1212830188116081)
at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print$lzycompute(<notebook>:7)
at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print(<notebook>:6)
at lined28d5369d60244b0a66d1d87a30c93a027.$eval.$print(<notebook>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
Does this error means that the current spark cluster configuration is not able to handle the incoming message load. Is it related to any memory issue.Could someone please assist.