1

I m actually trying to implement the pub/sub pattern using node-amqp (https://github.com/postwait/node-amqp).

I have some problems to implement it.

What I need :

  • Publish message from a user
  • Broadcast it to others user
  • Sending the message to offline users that will consume it the next time they'll be connected

What I've actually :

(function () {

    var amqp = require('amqp');

    var connection = amqp.createConnection({ host: 'http://127.0.0.1:5672/' });
    var app = require('express')();
    var server = require('http').Server(app);
    var io = require('socket.io')(server);

    app.get('/', function (req, res) {
        res.sendfile(__dirname + '/index.html');
    });

    server.listen(8888);

// Wait for connection to become established.


    connection.on('ready', function () {

        var sendMessage = function (queue, msg) {
            connection.publish(queue, JSON.stringify(msg));
        }


        io.sockets.on('connection', function (socket) {

            socket.on('message', function (msg) {
                sendMessage('my-queue', msg);
            });

            connection.queue('my-queue', {autoDelete: false}, function (q) {
                q.bind('#');

                q.subscribe(function (message) {
                    socket.broadcast.emit('news',message);
                });
            });

        });
    });
})()
  • On the index.html page, I connect to the socket server
  • I have a button that send a message
  • I open two different browser on the index page, and my users are both connected
  • If I send a message to the server, it send it to the other users
  • If I send a second message to the server, it send the message to the user that sent the message.

It's switching, every pair message (because I have two users), the other users get the message, if it's an impair message, the current user sending the message receive the message. What is this behaviour ?

Can you help me correcting my code to implement my needs a good way ?

NB : I use RabbitMQ with standard config on a windows 7 x64 computer

EDIT : I made a solution and every consumer can get the message with :

(function () {

    var amqp = require('amqp');

    var connection = amqp.createConnection({ host: 'http://127.0.0.1:5672/' });
    var app = require('express')();
    var server = require('http').Server(app);
    var io = require('socket.io')(server);

    app.get('/', function (req, res) {
        res.sendfile(__dirname + '/index.html');
    });

    server.listen(8888);

// Wait for connection to become established.


    connection.on('ready', function () {


        connection.exchange('logs', {type: 'fanout', autoDelete: false}, function (exchange) {

            var sendMessage = function (queue, msg) {
                exchange.publish(queue, JSON.stringify(msg));
            }

            io.sockets.on('connection', function (socket) {

                socket.on('message', function (msg) {
                    sendMessage('', msg);
                });

                connection.queue(socket.id, {exclusive: true}, function (q) {
                    q.bind('logs', '');

                    q.subscribe(function (message) {
                        socket.emit('news', message);
                    });
                });

            });
        });
    });
})()

My last problem is that I cant manage offline messages now... Any solutions ? (bounty end tomorrow :-/)

mfrachet
  • 8,772
  • 17
  • 55
  • 110

2 Answers2

3

The problem is RabbitMQ will send each message to a single user on purpose. That user acknowledges that it received the message (amqp does this for you automatically) and then the work is done as far as RabbitMQ is concerned, so it deletes the message.

The reason your users take turns receiving a message is that RabbitMQ tries to spread to load of incoming messages evenly over users.

Your question has been answered before here. Check it out for a solution to your problem!

Jasper Woudenberg
  • 1,156
  • 9
  • 15
2

You need to create separate queues for each of the consumers (in your case - users), and route messages to all of them from exchange. That way, when you publish a message, it will be placed in all users' queues, from which each of them will be able to consume it independently.

You have this: http://www.rabbitmq.com/tutorials/tutorial-two-python.html

And you need this: http://www.rabbitmq.com/tutorials/tutorial-three-python.html

OhJeez
  • 2,774
  • 2
  • 14
  • 18
  • Thank you for your help. I got a problem now concerning the offline data... Could you help me ? I edited my post – mfrachet Mar 17 '15 at 08:05