I'm trying to send and receive messages using Apache ActiveMQ Artemis 2.24, using Python to send messages using STOMP, and receiving messages in Java. The Stomp.py version is 8.0.1. Python version is 3.10.4. Java version is 1.8.0_342.
If I run my Java based message consumer and send a message using Java code, everything works fine. But if I send a message to the queue using Stomp.py I get the following exception on the receive side:
Exception in thread "main" java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1864388729 is greater than readableBytes=10
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:103)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:54)
The message consumer looks like this:
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ActiveMQConsumerMain
{
public static void main(String[] args) throws Exception
{
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
// We need a queue attached to the address ...
try
{
session.createQueue("example", RoutingType.ANYCAST, "example", true);
}
catch( ActiveMQQueueExistsException amqe )
{
if( amqe.getMessage().contains("already exists" ))
{
// no problem, our queue already exists
System.out.println( "Queue already exists on server!" );
}
else
{
amqe.printStackTrace();
}
}
// And a consumer attached to the queue ...
ClientConsumer consumer = session.createConsumer("example");
session.start();
while( true )
{
System.out.println( "Listening..." );
ClientMessage msgReceived = consumer.receive();
System.out.println("message = " + msgReceived.getBodyBuffer().readString());
msgReceived.acknowledge();
session.commit();
}
}
}
The Java message producer that works looks like this:
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ActiveMQProducerMain
{
public static void main(String[] args) throws Exception
{
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
ClientProducer producer = session.createProducer("example");
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("Hello world!!!");
// We need a queue attached to the address ...
try
{
session.createQueue("example", RoutingType.ANYCAST, "example", true);
}
catch( ActiveMQQueueExistsException amqe )
{
if( amqe.getMessage().contains("already exists" ))
{
// no problem, out queue already exists
System.out.println( "Queue already exists on server!" );
}
else
{
amqe.printStackTrace();
}
}
// Once we have a queue, we can send the message ...
producer.send(message);
}
}
And the Python message producer code is as follows:
import time
import stomp
def main():
print( "Sending ActiveMQ message using STOMP client!\n" )
conn = stomp.Connection( [('172.16.1.141', 61613)] )
conn.connect( wait=True, headers={'consumerWindowSize': 0})
conn.send(body='Hello Python World', destination='example')
time.sleep(5)
conn.disconnect()
exit()
if __name__ == "__main__":
main()
EDIT 1:
Tried something else - I set up a Stomp.py message consumer and when I run that, and send the message with Stomp.py, everything works fine. So it seems that both the Java and Python client libraries fundamentally "work" but something about the intersection of the two (eg, sending from Python, receiving in Java) is breaking.
EDIT 2:
Also, if I run the Python based message consumer, and send using the Java message producer, the Python code receives a message, but the content of the message appears to be empty. So, again, it looks like there is some weird mismatch between what's happening in "Java land" and what's happening in "Python land."
Any thoughts on what could be causing this?
EDIT 3:
I tried switching to readNullableSimpleString() per the answer from Justin below, so now my code looks like this:
System.out.println("message = " + msgReceived.getBodyBuffer().readNullableSimpleString() );
and now I get this:
java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1701604463 is greater than readableBytes=13
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
at org.apache.activemq.artemis.api.core.SimpleString.readNullableSimpleString(SimpleString.java:158)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:69)
at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:64)
Looks like a call to readNullableSimpleString still ultimately triggers a call to readSimpleString() which errors out.
EDIT 4:
I still don't understand the "why" of exactly what's happening under the hood, but I did find a way to get my message successfully in the consumer. This mechanism works:
int bodySize = msgReceived.getBodySize();
byte[] bytes = new byte[bodySize];
msgReceived.getBodyBuffer().readBytes( bytes );
System.out.println("message = " + new String( bytes ) );
Looks like the STOMP message is being turned into a BytesMessage and not a TextMessage under the hood? Or something close to that. And some older discussion I found suggests that this does, in turn, relate to the presence or absence of the content-length header. Weird. But for now I'm happy I at least a path forward, even if it's not the perfect path forward.