5

I'm having a really weird issue using the Alpakka AMQP connector and Akka Streams.

When my RabbitMQ message broker restarts, the source seems to restart fine. However, once it's restarted, the stream never completes, and the message gets lost in a partition farther in the stream. When I start the AMQP server, my Akka app works fine, but everything is messed up the other way around.

Here's how I initialize my AMQPSource:

val amqpMessageSource = builder.add {
  val amqpSource = AmqpSource(
    NamedQueueSourceSettings(connectionDetails, amqpInMessageQueue).withDeclarations(queueDeclaration),
    bufferSize = 10
  ).map { message =>
    fromIncomingMessage(message)
  }.initialDelay(5.seconds)
  amqpSource.recoverWithRetries(-1, { case _ => amqpSource }) // Retry every 5 seconds an infinity of times
}

I've tried to remove the partition where the issue occurs to send the stream straight to the flow that is relevant for my example, and it's even weirder: in this case, the AMQP client doesn't even read messages from RabbitMQ anymore.

I'm obviously missing something here but I've tried a lot of different things that didn't solve my problem at all.

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
Le G
  • 408
  • 3
  • 7

0 Answers0