I'm using Stratio/spark-rabbitmq to read message from rabbitMQ. I could not find a way to acknowledge the messages once the processing of the message is done in the spark.
Following is the code snipet:
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2",
"queueName" -> config.getString("application.rabbitmq.queue"),
"exchange" -> config.getString("application.rabbitmq.exchange"),
"host" -> config.getString("application.rabbitmq.host"),
"port" -> config.getString("application.rabbitmq.port"),
"routingKeys" -> config.getString("application.rabbitmq.queue") )
/*val distributeKey = Seq(
RabbitMQDistributedKey(
config.getString("application.rabbitmq.queue"),
new ExchangeAndRouting(config.getString("application.rabbitmq.exchange"), "query.analyzer"),
rabbitParams
)
)*/
val receiverStream: InputDStream[String] = RabbitMQUtils.createStream(
ssc,
rabbitParams)
receiverStream.start()
receiverStream.foreachRDD(
rdd => {
if(!rdd.isEmpty()){
// Processing
}
}