1

I am using Stomp.py connecting to a standard ACtiveMQ server. I am simulating cases where the receiver crashes and I want to be able to restart it and it continue running from the message after the one that caused it to crash.

I have created two sample scripts:

  • putMessagesToQueue.py - This will put 56 messages into the destination
  • readMessagesFromQueue.py - This will read messages from the destination. If it reads the 6th message it will raise an exception. Each message takes 1 second to process

Steps I take to run the test:

  1. I run putMessagesToQueue.py
  2. I run readMessagesFromQueue.py - it processes 5 messages sucessfully and an exception is raised in message 6
  3. I terminate readMessagesFromQueue.py (ctrl-c)
  4. I run readMessagesFromQueue.py again

For the behaviour I want in step 4 I want it to start processing from message 7.

However I don't see this. If reciever subscribes with ack='auto' then in step 4 it processes no messages - all the messages are gone from the queue and I have lost 50 messages!

If I use ack='client' or ack='client-individual' then on step 4 it starts again from the beginning then crashes again on message 6.

This seems to suggest that the reciever is not processing messages on at a time, instead it is taking every single message at once and running through each one. I don't want this behaviour because I would like to scale up to running 5 recievers and I want the load distributed. At the moment the first reciever I start takes all the messages and starts churning through them and recievers 2-4 just wait for new messages. I want the recievers to take messages one at a time instead!

Can anyone give any hints on how I am implementing this wrong:

Source

putMessagesToQueue.py

import stomp

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

for x in range(0,5):
  conn.send(body="OK-BEFORE-CRASH", destination=destination)

conn.send(body="CRASH", destination=destination)

for x in range(0,50):
  conn.send(body="OK-AFTER-CRASH", destination=destination)

readMessagesFromQueue.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  def __init__(self, processMessage):
    self.processMessage = processMessage
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    self.processMessage(headers, message)

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")

Update 001

I managed to obtain the desired behavour described above by changing the receive function to use ack='client-individual' and to manually send ack messages. (See new version below)

But I am still unable to get the recievers to process one message at a time. This can be demonstrated in the following steps:

  1. I run putMessagesToQueue.py
  2. I run readMessagesFromQueue2.py - it will start processing
  3. In a new terminal run readMessagesFromQueue2.py

At first the second readMessagesFromQueue2 does nothing until the first one crashes, it then starts receiving messages. I want both instances of the reciever to read the messages from the start.

readMessagesFromQueue2.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  conn = None
  def __init__(self, processMessage, conn):
    self.processMessage = processMessage
    self.conn = conn
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    try:
      self.processMessage(headers, message)
    finally:
      self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")
Robert3452
  • 1,354
  • 2
  • 17
  • 39
  • Hi. How do we make sure that the receiver loop is always running? I keep getting this message: "Receiver loop ended". Please help. – user3142747 Jun 02 '20 at 01:12
  • It's hard to tell from the information in your comment. I have wrapped my use of STOMP into a library in this git repo - https://github.com/rmetcalf9/mq_client_abstraction/tree/master/experiment. The Stomp client sub dir under this was working last time I ran it. – Robert3452 Jun 03 '20 at 08:38

1 Answers1

2

Lots of reading of diffent docs and I found the problem.

ActiveMQ has an option prefetch size - https://svn.apache.org/repos/infra/websites/production/activemq/content/5.7.0/what-is-the-prefetch-limit-for.html

If you have few messages that take a long time to process you can set it to 1. This is not apropiate in other situations.

I can do this in stopm.py with the following line: conn.subscribe(destination=destination, id=1, ack='client-individual', headers={'activemq.prefetchSize': 1})

So using manual or auto ack was neither here nor there. The key is limiting prefetch to 1.

Robert3452
  • 1,354
  • 2
  • 17
  • 39
  • In more detail. To make this work I need prefetchsize=1 client-individual ack's, I need to always send them and send them before the long running process that handles the message otherwise the connection may reset causing an unknown ack error – Robert3452 May 05 '20 at 14:42