0

I am trying to implement fail-over in my Python applications using STOMP to connect to ActiveMQ brokers (in Amazon MQ).

I have 2 brokers (B1, B2), 1 producer (P1), 1 consumer (C1).

I am expecting that I only have to connect to a single broker(B1) in producer(P1), and consumer(C1) and whenever the connected broker(B1) is down, the producer(P1) and consumer(C1) automatically connects to another broker(B2) since they both are in a network. But the scenario is not happening, what happens is, when B1 is down/restarted the P1's connection to it is lost and the script ends.

Producer:

# from stomp import Connection
import stomp
import time

host1 = "<host1>"
host2 = "<host2>"
port1 = 61614
username = "<username>"
password = "<password>"

addresses = [(host1, port1)]
conn = stomp.Connection(addresses)
conn.set_ssl(
    for_hosts=addresses,
    cert_file="<path to cert file>",
    key_file="<path to key file>"
)

conn.connect(username, password, wait=True)
print("Successfully connected.")

# conn.send(body="Testing {}".format(int(time.time())), destination="/queue/queue1", headers={"AMQ_SCHEDULED_DELAY": 15000})

import time
for i in range(0, 100):
    conn.send(body="Testing {} {}".format(int(time.time()), i), destination="/queue/queue1", headers={"persistent": "true", "AMQ_SCHEDULED_DELAY": 15000})
    time.sleep(3)

conn.disconnect()

Consumer:

import stomp
import json


class MyListener(stomp.ConnectionListener):
    def on_connecting(self, host_and_port):
        super().on_connecting(host_and_port)
        print("connecting to {}".format(host_and_port))

    def on_error(self, frame):
        print("Received an error {}".format(frame.body))

    def on_message(self, frame):
        print("Received a message {}".format(frame.body))
        lambda_payload = json.dumps({"body": frame.body})

host1 = "<host 1>"
host2 = "<host 2>"
port1 = 61614
username = "<username>"
password = "<password>"

addresses = [(host2, port1)]

conn = stomp.Connection(addresses)
conn.set_ssl(
    for_hosts=addresses,
    cert_file="<path to cert file>",
    key_file="<path to key file>"
)

conn.set_listener("subscriber1", MyListener())
conn.connect(username, password, wait=True)
print("Successfully connected")


conn.subscribe(destination="/queue/queue1", id=1, ack="auto")

while True:
    pass

ActiveMQ configuration for B1

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<broker schedulePeriodForDestinationPurge="10000" schedulerSupport="true" xmlns="http://activemq.apache.org/schema/core">
  <persistenceAdapter>
    <kahaDB concurrentStoreAndDispatchQueues="false"/>
  </persistenceAdapter>
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry gcInactiveDestinations="true" inactiveTimoutBeforeGC="600000" topic="&gt;">
          <pendingMessageLimitStrategy>
            <constantPendingMessageLimitStrategy limit="1000"/>
          </pendingMessageLimitStrategy>
        </policyEntry>
        <policyEntry gcInactiveDestinations="true" inactiveTimoutBeforeGC="600000" queue="&gt;"/>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
  <plugins>
  </plugins>
  <networkConnectors>
    <networkConnector conduitSubscriptions="false" duplex="true" name="NetworkConnectorTest1" uri="static:(<URI to 2nd broker>)" userName="<user name>"/>
  </networkConnectors>
  <transportConnectors>
    <transportConnector name="openwire" rebalanceClusterClients="true" updateClusterClients="true" updateClusterClientsOnRemove="true"/>
  </transportConnectors>
</broker>
Justin Bertram
  • 29,372
  • 4
  • 21
  • 43
ans98
  • 96
  • 1
  • 5
  • The STOMP protocol itself does not define any fail-over or reconnection semantics for connections. Therefore, this functionality is up to client implementations. As far as I can tell this really has nothing to with ActiveMQ but rather the Python-based STOMP client being used. – Justin Bertram Feb 28 '23 at 20:19
  • Your expectation that producer(P1) and consumer(C1) will automatically re-connect to another broker(B2) since they both are in a "network of brokers" is invalid. You must take care of reconnecting in your application(s). – Justin Bertram Feb 28 '23 at 20:23
  • @JustinBertram Doesn't activemq handle that, the same is mentioned in their documentation with a Java Code as well. Oh you mean that activemq provides the feature but it depends upon the client to take advantage of that or not ? – ans98 Mar 01 '23 at 05:24
  • The broker cannot handle anything in the client. The broker has the active/passive functionality for high availability, but the client is responsible for taking advantage of that. That said, ActiveMQ "Classic" does ship an OpenWire-based JMS client implementation which can take advantage of this and reconnect automatically, but you aren't using that client. You're using a generic Python STOMP client which means you have to manage the reconnect yourself or potentially use the reconnection features available in the client. – Justin Bertram Mar 03 '23 at 16:10

0 Answers0