I have coded up a C (rabbitmq-c) worker app which consumes a queue published by a Python script (pika).
I have the following strange behaviour which I can't seem to solve:
- Starting all the workers before messages are published to the queue works as expected
- Starting 1 worker after the queue has been published works as expected
- HOWEVER: Starting additional workers after a worker has started consuming from the queue means that those workers don't see any messages on the queue (message count=0) and therefore just wait (eventhough there are meant to be many messages still on the queue). Killing the first worker will suddently start messages flowing to all the other (waiting) consumers.
Any ideas what could be going on?
I've tried making sure that each consumer has it's own channel (is this necessary?) but still the same behaviour...
Here's the code for the consumer (worker):
conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
"/",
0,
131072,
0,
AMQP_SASL_METHOD_PLAIN,
"guest",
"guest");
if (amqp_channel_open(conn, chan) == NULL)
LOG_ERR(" [!] Failed to open amqp channel!\n");
if ((q = amqp_queue_declare(conn,
chan,
amqp_cstring_bytes("ranges"),
0,
0,
0,
0,
amqp_empty_table)) == NULL)
LOG_ERR(" [!] Failed to declare queue!\n");
LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);
amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while(1) {
amqp_maybe_release_buffers(conn);
amqp_consume_message(conn, &e, NULL, 0);
{
int n;
amqp_frame_t f;
unsigned char buf[8];
unsigned char *pbuf = buf;
amqp_simple_wait_frame(conn, &f); // METHOD frame
amqp_simple_wait_frame(conn, &f); // HEADER frame
n = f.payload.properties.body_size;
if (n != sizeof(range_buf))
LOG_ERR(" [!] Invalid message size!");
while (n) {
amqp_simple_wait_frame(conn, &f); // BODY frame
memcpy(pbuf,
f.payload.body_fragment.bytes,
f.payload.body_fragment.len);
n -= f.payload.body_fragment.len;
pbuf += f.payload.body_fragment.len;
}
// do something with buf
LOG_INFO(" [x] Message recevied from queue\n");
}
amqp_destroy_envelope(&e);
amqp_maybe_release_buffers(conn);
}