I am trying to write a RabbitMQ Consumer in Go. Which is suppose to take the 5 objects at a time from the queue and process them. Moreover, it is suppose to acknowledge if successfully processed else send to the dead-letter queue for 5 times and then discard, it should be running infinitely and handling the cancellation event of the consumer. I have few questions :
- Is there any concept of
BasicConsumer
vsEventingBasicConsumer
in RabbitMq-go Reference? - What is
Model
in RabbitMQ and is it there in RabbitMq-go? - How to send the objects when failed to dead-letter queue and again re-queue them after
ttl
- What is the significance of
consumerTag
argument in thech.Consume
function in the below code - Should we use the
channel.Get()
orchannel.Consume()
for this scenario?
What are the changes i need to make in the below code to meet above requirement. I am asking this because i couldn't find decent documentation of RabbitMq-Go.
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
Edited question:
I have delayed the processing of the messages as suggested in the links link1 link2. But the problem is messages are getting back to their original queue from dead-lettered queue even after ttl. I am using RabbitMQ 3.0.0
. Can anyone point out what is the problem?