I am trying to implement fail-over in my Python applications using STOMP to connect to ActiveMQ brokers (in Amazon MQ).
I have 2 brokers (B1, B2), 1 producer (P1), 1 consumer (C1).
I am expecting that I only have to connect to a single broker(B1) in producer(P1), and consumer(C1) and whenever the connected broker(B1) is down, the producer(P1) and consumer(C1) automatically connects to another broker(B2) since they both are in a network. But the scenario is not happening, what happens is, when B1 is down/restarted the P1's connection to it is lost and the script ends.
Producer:
# from stomp import Connection
import stomp
import time
host1 = "<host1>"
host2 = "<host2>"
port1 = 61614
username = "<username>"
password = "<password>"
addresses = [(host1, port1)]
conn = stomp.Connection(addresses)
conn.set_ssl(
for_hosts=addresses,
cert_file="<path to cert file>",
key_file="<path to key file>"
)
conn.connect(username, password, wait=True)
print("Successfully connected.")
# conn.send(body="Testing {}".format(int(time.time())), destination="/queue/queue1", headers={"AMQ_SCHEDULED_DELAY": 15000})
import time
for i in range(0, 100):
conn.send(body="Testing {} {}".format(int(time.time()), i), destination="/queue/queue1", headers={"persistent": "true", "AMQ_SCHEDULED_DELAY": 15000})
time.sleep(3)
conn.disconnect()
Consumer:
import stomp
import json
class MyListener(stomp.ConnectionListener):
def on_connecting(self, host_and_port):
super().on_connecting(host_and_port)
print("connecting to {}".format(host_and_port))
def on_error(self, frame):
print("Received an error {}".format(frame.body))
def on_message(self, frame):
print("Received a message {}".format(frame.body))
lambda_payload = json.dumps({"body": frame.body})
host1 = "<host 1>"
host2 = "<host 2>"
port1 = 61614
username = "<username>"
password = "<password>"
addresses = [(host2, port1)]
conn = stomp.Connection(addresses)
conn.set_ssl(
for_hosts=addresses,
cert_file="<path to cert file>",
key_file="<path to key file>"
)
conn.set_listener("subscriber1", MyListener())
conn.connect(username, password, wait=True)
print("Successfully connected")
conn.subscribe(destination="/queue/queue1", id=1, ack="auto")
while True:
pass
ActiveMQ configuration for B1
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<broker schedulePeriodForDestinationPurge="10000" schedulerSupport="true" xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<kahaDB concurrentStoreAndDispatchQueues="false"/>
</persistenceAdapter>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry gcInactiveDestinations="true" inactiveTimoutBeforeGC="600000" topic=">">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry gcInactiveDestinations="true" inactiveTimoutBeforeGC="600000" queue=">"/>
</policyEntries>
</policyMap>
</destinationPolicy>
<plugins>
</plugins>
<networkConnectors>
<networkConnector conduitSubscriptions="false" duplex="true" name="NetworkConnectorTest1" uri="static:(<URI to 2nd broker>)" userName="<user name>"/>
</networkConnectors>
<transportConnectors>
<transportConnector name="openwire" rebalanceClusterClients="true" updateClusterClients="true" updateClusterClientsOnRemove="true"/>
</transportConnectors>
</broker>