1

I'm using Stratio/spark-rabbitmq to read message from rabbitMQ. I could not find a way to acknowledge the messages once the processing of the message is done in the spark.

Following is the code snipet:

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2",
      "queueName" -> config.getString("application.rabbitmq.queue"),
      "exchange" -> config.getString("application.rabbitmq.exchange"),
      "host" -> config.getString("application.rabbitmq.host"),
      "port" -> config.getString("application.rabbitmq.port"),
      "routingKeys" -> config.getString("application.rabbitmq.queue") )


 /*val distributeKey = Seq(
   RabbitMQDistributedKey(
     config.getString("application.rabbitmq.queue"),
     new ExchangeAndRouting(config.getString("application.rabbitmq.exchange"), "query.analyzer"),
     rabbitParams
   )
  )*/



  val receiverStream: InputDStream[String] = RabbitMQUtils.createStream(
    ssc,
    rabbitParams)


  receiverStream.start()

  receiverStream.foreachRDD(
    rdd => {

      if(!rdd.isEmpty()){
         // Processing 
       }
      }
Rajat Mishra
  • 3,635
  • 4
  • 27
  • 41

0 Answers0