0

Firstly I apologise if this question has been handled elsewhere, I just haven't found something that solved my particular problem.

I have a gateway server which receives msgs externally. It puts it in a queue that my Order processing server is listening on. My order processing server listens on 2 queues (in a thread). Queue 1 - gateway server, Queue 2 - clearing server.

So in my order processor, I have worker threads. I'm using ExecutorService to manage my threads. The problem is in the worker thread.

In the worker thread I make two instance of MQs that I use to publish message to either the clearing server or the gateway server. I basically need to do some processing and then publish that message to these queues.

What I want to know is, should I close the channel and connection in my worker thread each time I finish dealing with a message?

If I don't close the MQ connections on every worker thread after processing a message then after processing 8-900 messages, I start getting following exception intermittently :

java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Unknown Source)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:307)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
    at interfaces.MQ.<init>(MQ.java:41)
    at orderProcessor.ProcessOrders.<init>(ProcessOrders.java:109)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:52)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
    at orderProcessor.ProcessIncomingCSThread.routeIncoming(ProcessIncomingCSThread.java:45)
    at interfaces.ProcessIncomingThread.run(ProcessIncomingThread.java:47)

If I do close the connections on every worker thread after processing a msg then after a awhile I get following exception intermittently :]

Exception in thread "AMQP Connection 127.0.0.1:5672" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Unknown Source)
    at com.rabbitmq.client.impl.ChannelManager.scheduleShutdownProcessing(ChannelManager.java:108)
    at com.rabbitmq.client.impl.ChannelManager.handleSignal(ChannelManager.java:94)
    at com.rabbitmq.client.impl.AMQConnection.finishShutdown(AMQConnection.java:696)
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:669)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)

I am using basicQos when I create a MQ connection for consumption to keep RabbitMQ's internal queue reasonable.

I create my MQ connections for consumption in following manner :

_channel.queueDeclare(this._mqName.toString(), true, false, false, null);
_channel.basicConsume(this._mqName.toString(), true, _consumer);
_channel.basicQos(50);

Thanks for looking at this and any suggestions or help would be much appreciated. it is more than likely that I'm not doing things correctly for my context..

Robin Green
  • 32,079
  • 16
  • 104
  • 187
OriginalCliche
  • 381
  • 4
  • 15
  • You should not need to close connections and channels every time you send a message. Also, what load are we talking about here? Is the server entering in flow state when sending? – hveiga Nov 29 '13 at 00:04
  • What do you mean by flow state? I have tried both approach of closing and not closing the queues. I make my gateway server produce 2k dummy orders and push it to the queue that Order Processor is listening to process. Things would've been much easier if my worker thread didnt need to subsequently push messages onto queues as then I could avoid having MQ connections for the purpose of publishing in my worker threads. – OriginalCliche Nov 29 '13 at 00:08
  • Flow state happens in RabbitMQ when the broker is not able to route the message fast enough from the exchanges to the queues. When this happens, the producers get throttled and they start accumulating messages internally waiting for the broker to be able to handle the load. If you keep trying to send messages but the broker is in flow state, the message will be stored in the JVM memory which means that at some point, you will run out of memory. To check if you are getting into flow state, you need to check the connections in the broker for flow or blocked state when sending messages. – hveiga Nov 29 '13 at 00:14
  • pretty sure its not in flow state as i receive the messages almost immediately – OriginalCliche Nov 29 '13 at 00:21
  • Then it seems to be a memory leak as Robin mentioned. – hveiga Nov 29 '13 at 00:32

3 Answers3

0

Seems like you have a memory leak. Use a profiler.

Robin Green
  • 32,079
  • 16
  • 104
  • 187
0

I'm not familiar with RabbitMQ, but I suspect that either you or RabbitMQ are trying to create more threads that the OS is configured to handle.

Maybe this two links can help you:

Matt Ke
  • 3,599
  • 12
  • 30
  • 49
kovica
  • 2,443
  • 3
  • 21
  • 25
0

Thanks for the input guys. I resolved the issue.

I was creating connection in every worker thread. Now I create the connection on the main thread and pass that down to worker threads who create their channels from that connection. this seems to work a treat.

This however means, I'll have to re-design my MQ class to handle this workflow.

OriginalCliche
  • 381
  • 4
  • 15