0

I'm trying to send and receive messages using Apache ActiveMQ Artemis 2.24, using Python to send messages using STOMP, and receiving messages in Java. The Stomp.py version is 8.0.1. Python version is 3.10.4. Java version is 1.8.0_342.

If I run my Java based message consumer and send a message using Java code, everything works fine. But if I send a message to the queue using Stomp.py I get the following exception on the receive side:

Exception in thread "main" java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1864388729 is greater than readableBytes=10
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:103)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
    at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:54)

The message consumer looks like this:

import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;

public class ActiveMQConsumerMain 
{
    public static void main(String[] args) throws Exception
    {
        ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");

        ClientSessionFactory factory =  locator.createSessionFactory();
        ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );

        // We need a queue attached to the address ...

        try
        {
            session.createQueue("example", RoutingType.ANYCAST, "example", true);
        }
        catch( ActiveMQQueueExistsException amqe )
        {
            if( amqe.getMessage().contains("already exists" ))
            {
                // no problem, our queue already exists
                System.out.println( "Queue already exists on server!" );
            }
            else
            {
                amqe.printStackTrace();
            }
        }
        
        // And a consumer attached to the queue ...

        ClientConsumer consumer =  session.createConsumer("example");

        session.start();
        
        while( true )
        {
            System.out.println( "Listening..." );
            ClientMessage msgReceived = consumer.receive();

            System.out.println("message = " + msgReceived.getBodyBuffer().readString());
            
            msgReceived.acknowledge();
        
            session.commit();
        }
    }
}

The Java message producer that works looks like this:

import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;

public class ActiveMQProducerMain 
{
    public static void main(String[] args) throws Exception
    {
        ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");

        ClientSessionFactory factory =  locator.createSessionFactory();
        ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
        
        ClientProducer producer = session.createProducer("example");
        ClientMessage message = session.createMessage(true);
        message.getBodyBuffer().writeString("Hello world!!!");

        // We need a queue attached to the address ...

        try
        {
            session.createQueue("example", RoutingType.ANYCAST, "example", true);
        }
        catch( ActiveMQQueueExistsException amqe )
        {
            if( amqe.getMessage().contains("already exists" ))
            {
                // no problem, out queue already exists
                System.out.println( "Queue already exists on server!" );
            }
            else
            {
                amqe.printStackTrace();
            }
        }

        // Once we have a queue, we can send the message ...

        producer.send(message); 
    }

}

And the Python message producer code is as follows:

import time
import stomp

def main():
    print( "Sending ActiveMQ message using STOMP client!\n" )
    
    conn = stomp.Connection( [('172.16.1.141', 61613)] )
    
    conn.connect( wait=True, headers={'consumerWindowSize': 0})
    
    conn.send(body='Hello Python World', destination='example')
    time.sleep(5)
    conn.disconnect()
    
    exit()
    
if __name__ == "__main__":
    main()
    

EDIT 1:

Tried something else - I set up a Stomp.py message consumer and when I run that, and send the message with Stomp.py, everything works fine. So it seems that both the Java and Python client libraries fundamentally "work" but something about the intersection of the two (eg, sending from Python, receiving in Java) is breaking.

EDIT 2:

Also, if I run the Python based message consumer, and send using the Java message producer, the Python code receives a message, but the content of the message appears to be empty. So, again, it looks like there is some weird mismatch between what's happening in "Java land" and what's happening in "Python land."

Any thoughts on what could be causing this?

EDIT 3:

I tried switching to readNullableSimpleString() per the answer from Justin below, so now my code looks like this:

System.out.println("message = " + msgReceived.getBodyBuffer().readNullableSimpleString() );

and now I get this:

java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1701604463 is greater than readableBytes=13
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
    at org.apache.activemq.artemis.api.core.SimpleString.readNullableSimpleString(SimpleString.java:158)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:69)
    at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:64)

Looks like a call to readNullableSimpleString still ultimately triggers a call to readSimpleString() which errors out.

EDIT 4:

I still don't understand the "why" of exactly what's happening under the hood, but I did find a way to get my message successfully in the consumer. This mechanism works:

                int bodySize = msgReceived.getBodySize();
                byte[] bytes = new byte[bodySize];
                msgReceived.getBodyBuffer().readBytes( bytes );
                System.out.println("message = " + new String( bytes ) );

Looks like the STOMP message is being turned into a BytesMessage and not a TextMessage under the hood? Or something close to that. And some older discussion I found suggests that this does, in turn, relate to the presence or absence of the content-length header. Weird. But for now I'm happy I at least a path forward, even if it's not the perfect path forward.

mindcrime
  • 657
  • 8
  • 23
  • Can you enable debug logging as described in the documentation [here](https://activemq.apache.org/components/artemis/documentation/latest/stomp.html#logging) and [here](https://activemq.apache.org/components/artemis/documentation/latest/logging.html#activating-trace-for-a-specific-logger), reproduce the problem, and paste the logs into your question? That would help narrow down the problem. It looks like the client is setting a `content-length` header no matter what. – Justin Bertram Sep 18 '22 at 02:53
  • I think I'm *starting* to understand. The Python client library has an auto_content_length setting which is True by default. And when that is on, there is a content-length header. If I'm following this correctly, in that case the Java code winds up with a BytesMessage in hand and that's why I can read the message using readBytes(). But if you disable that, then I'm not sure what happens. When I set it to False, no combination of readString(), readSimpleString(), readNullableSimpleString(), etc. manages to read anything. – mindcrime Sep 18 '22 at 03:08
  • For what it's worth the documentation cited in my answer discusses how the `content-length` header is handled and how it impacts how the message is encoded. – Justin Bertram Sep 18 '22 at 03:08
  • Gotcha. I'll read up on it. For now, at least I know I have a way to move forward which is good. This is my first time trying to use STOMP with ActiveMQ, and clearly there are some pitfalls to watch out for! – mindcrime Sep 18 '22 at 03:09
  • 1
    Keep in mind that the core API is pretty low-level and not commonly used. Most folks use a standard API like JMS or a standard protocol like STOMP, MQTT, or AMQP on both ends. – Justin Bertram Sep 18 '22 at 03:11
  • 1
    The debug logging I requested previously should help clarify the situation as it will log all incoming and outgoing STOMP frames. – Justin Bertram Sep 18 '22 at 03:12
  • Yeah, I'll probably dig into that tomorrow. One thing I do see is that the getType() method on the Message does flip between 3 (TEXT) and 4 (BYTES) based on the presence of absence of that content-length header, as expected. The only part I don't understand now is why the TextMessage that comes when there is no content-length header apparently can't be read. Probably just something I don't understand about the API. Or maybe there's a bug, who knows? – mindcrime Sep 18 '22 at 03:15

2 Answers2

1

When the broker receives a STOMP message with no content-length header then it encodes the body as a nullable SimpleString using the writeNullableSimpleString method on org.apache.activemq.artemis.api.core.ActiveMQBuffer. Therefore, when you receive that message as a core message you need to read that data as a nullable SimpleString using the readNullableSimpleString method on org.apache.activemq.artemis.api.core.ActiveMQBuffer. This is outlined in the documentation.

The reason this is working with your core producer and core consumer is they are using writeString and readString respectively.

Likewise, it doesn't work with your core producer and Python consumer because the core producer is using writeString and the message is being converted into a STOMP MESSAGE frame using readNullableSimpleString which, of course, doesn't work.

It is important to note that how you get the ActiveMQBuffer from the ClientMessage matters due to the way we track the index on the buffer internally. For example, the JavaDoc on ClientMessage#getBodyBuffer states:

The buffer to write the body.

Warning: If you just want to read the content of a message, use getDataBuffer() or getReadOnlyBuffer();

Therefore, when you use readNullableSimpleString you'll want to do so by using either msgReceived.getDataBuffer().readNullableSimpleString() or msgReceived.getReadOnlyBuffer().readNullableSimpleString()

Lastly, keep in mind that the body of a message is ultimately just an array of bytes. The broker has no way of knowing what kind of data is in the array. It could be human readable text or it could be binary data, and even if it was text it could be encoded any number of ways. In order for clients to exchange messages with each other they must use a common format. In this case, Java clients using the core API must use a nullable SimpleString.

Justin Bertram
  • 29,372
  • 4
  • 21
  • 43
  • I tried changing to reading the text with readNullableSimpleString() and still get basically the same error (see edit in original question). Also, it appears that the stomp.py library automatically adds a content-length header unless you turn that off explicitly. I tried turning it off and setting a content-length header manually and saw the same result. Any other ideas on what might be going on? – mindcrime Sep 18 '22 at 02:22
  • 1
    I updated my answer to clarify _how_ to get the `ActiveMQBuffer` for reading. – Justin Bertram Sep 18 '22 at 15:54
  • Awesome. I'll make your answer as "Accepted" since it led me to figuring out what was going on originally. And that little API distinction was the last missing piece of making this work. – mindcrime Sep 18 '22 at 22:13
-1

OK, think I finally figured out what's going on here. There's a couple of different issues at play, and the interactions of them was making this confusing early on.

My current understanding:

  1. stomp.py has a setting "auto_content_length" that controls whether or not the library automatically adds a content-length header to the message. And then the ActiveMQ broker creates different Message types from the STOMP message depending on whether or not that header is present. If the header is present, you get a BytesMessage. Otherwise you get a TextMessage. The auto_content_length is True by default, so when all this started I was getting BytesMessage's when I naively expected Text (hey, I'm sending a String, right?).

  2. Now on the Java side, you have to use the API differently depending on which type of Message you get. If you get a BytesMessage, you call getBodyBuffer() and then read the bytes using readBytes(). I was originally getting BytesMessage and calling getBodyBuffer(), but mistakenly trying to use the various readString() methods, which was wrong.

  3. Also on the Java side, if you get a TextMessage (what you get if you turn the content-length header off on the Python/STOMP side) then you have to do things a little differently. You call getDataBuffer() on the Message object, and then you use readNullableSimpleString() to read the underlying text.

So this will successfully read the message contents when there is no content-length header.

     ActiveMQBuffer tempBuff = msgReceived.getDataBuffer();
     System.out.println( "tempBuff: " + tempBuff.readNullableSimpleString() );

So in the end, my problem was a combination of:

  1. Not knowing about that auto_content_length setting
  2. Not originally understanding how ActiveMQ uses the content-length header to change the Message type under the hood
  3. Not realizing that you have to call either getBodyBuffer() / readBytes(), or getDataBuffer() / readNullableSimpleString(), based on which type of Message you are handling.

With that understanding in hand, I can now read messages on the Java side with or without the content-length header being set on the Python side.

mindcrime
  • 657
  • 8
  • 23