I m actually trying to implement the pub/sub pattern using node-amqp (https://github.com/postwait/node-amqp).
I have some problems to implement it.
What I need :
- Publish message from a user
- Broadcast it to others user
- Sending the message to offline users that will consume it the next time they'll be connected
What I've actually :
(function () {
var amqp = require('amqp');
var connection = amqp.createConnection({ host: 'http://127.0.0.1:5672/' });
var app = require('express')();
var server = require('http').Server(app);
var io = require('socket.io')(server);
app.get('/', function (req, res) {
res.sendfile(__dirname + '/index.html');
});
server.listen(8888);
// Wait for connection to become established.
connection.on('ready', function () {
var sendMessage = function (queue, msg) {
connection.publish(queue, JSON.stringify(msg));
}
io.sockets.on('connection', function (socket) {
socket.on('message', function (msg) {
sendMessage('my-queue', msg);
});
connection.queue('my-queue', {autoDelete: false}, function (q) {
q.bind('#');
q.subscribe(function (message) {
socket.broadcast.emit('news',message);
});
});
});
});
})()
- On the index.html page, I connect to the socket server
- I have a button that send a message
- I open two different browser on the index page, and my users are both connected
- If I send a message to the server, it send it to the other users
- If I send a second message to the server, it send the message to the user that sent the message.
It's switching, every pair message (because I have two users), the other users get the message, if it's an impair message, the current user sending the message receive the message. What is this behaviour ?
Can you help me correcting my code to implement my needs a good way ?
NB : I use RabbitMQ with standard config on a windows 7 x64 computer
EDIT : I made a solution and every consumer can get the message with :
(function () {
var amqp = require('amqp');
var connection = amqp.createConnection({ host: 'http://127.0.0.1:5672/' });
var app = require('express')();
var server = require('http').Server(app);
var io = require('socket.io')(server);
app.get('/', function (req, res) {
res.sendfile(__dirname + '/index.html');
});
server.listen(8888);
// Wait for connection to become established.
connection.on('ready', function () {
connection.exchange('logs', {type: 'fanout', autoDelete: false}, function (exchange) {
var sendMessage = function (queue, msg) {
exchange.publish(queue, JSON.stringify(msg));
}
io.sockets.on('connection', function (socket) {
socket.on('message', function (msg) {
sendMessage('', msg);
});
connection.queue(socket.id, {exclusive: true}, function (q) {
q.bind('logs', '');
q.subscribe(function (message) {
socket.emit('news', message);
});
});
});
});
});
})()
My last problem is that I cant manage offline messages now... Any solutions ? (bounty end tomorrow :-/)