node-amqp + socketio: как синхронизировать очередь подписки/отписки?

У меня есть приложение angularjs с узлом и экспрессом на стороне сервера. У меня также есть node-amqp и socket.io

Я хочу реализовать следующее поведение

Приложение имеет страницу (маршрут, угловое представление), которая отображает таблицу с данными в реальном времени. Данные обновляются в реальном времени с использованием socket.io и amqp для потоковой передачи данных с сервера rabbitMQ, который находится вне приложения.

Когда пользователь посещает эту страницу/маршрут в браузере

  1. клиент выдает событие сокета «подписаться»
  2. the server, on the socket event “subscribe”,
    • declares a rabbit queue
    • привязывает очередь кроликов к обмену
    • подписывается на сообщения/данные из очереди кролика
    • генерирует событие сокета «данные», отправляя данные обратно пользователю/клиенту

Когда пользователь покидает страницу или, другими словами, меняет маршрут

  1. клиент выдает событие сокета «отписаться»
  2. the server, on the socket event “unsubscribe”,
    • unsubscribes from the queue

Моя проблема: как обеспечить синхронизацию queue.subscribe и queue.unsubscribe? Если пользователь выполняет быструю последовательность изменений маршрута: посетить/уйти/посетить/уйти/посетить/уйти Порядок подписки и отказа от подписки иногда меняется на обратный, и сервер отменяет подписку во второй раз, прежде чем новая подписка будет завершена. Любое предложение? Это то, что я пробовал, но не работает:

Клиентская сторона: controller.js

.controller('WatchdogCtrl', function($scope, watchSocket) {

    var data = {}
    $scope.data = []

    var socket = watchSocket

    socket.emit('subscribe', { exchange: 'bus', key: 'mis.service-state' })
    socket.on('data', function(message) {
        // refreshing  data 
        data[message.payload.id] = message.payload;
        var new-values = [];
        angular.forEach(data, function(value, index) {
            this.push(value);
        }, new-values);

        $scope.data = new-values
        $scope.$apply()
    });

    $scope.$on('$destroy', function (event) {
        // unsubscribe from rabbit queue when leaving 
        socket.emit('unsubscribe')
    });
})

На стороне сервера: server.js

// set up amqp listener
var amqp = require('amqp');
// create rabbitmq connection with amqp
var rabbitMQ = amqp.createConnection({url: "amqp://my:url"});
rabbitMQ.on('ready', function() {
    console.log('Connection to rabbitMQ is ready')
});

// Hook Socket.io into Express
var io = require('socket.io').listen(server);
io.set('log level', 2);
io.of('/watch').on('connection', function(socket) {
    var watchq;
    var defr;
    socket.on('subscribe', function(spec) {
        watchq = rabbitMQ.queue('watch-queue', function(queue) {
            console.log('declare rabbit queue: "' + queue.name +'"');
            console.log('bind queue '+ queue.name + ' to exch=' + spec.exchange + ', key=' + spec.key);

            queue.bind(spec.exchange, spec.key)
            defr = queue.subscribe(function(message, headers, deliveryInfo) {
                     socket.emit('data', {
                        key: deliveryInfo.routingKey,
                        payload: JSON.parse(message.data.toString('utf8'))
                     })
                   }).addCallback(function(ok) { 
                       var ctag = ok.consumerTag; 
                       console.log('subscribed to queue: ' + queue.name + ' ctag = ' + ctag)
                   });

        })
    })

    socket.on('unsubscribe', function() {
        //needs fix: this does not ensure subscribe/unsubscribe synchronization…..
        defr.addCallback(function(ok) {
            console.log('unsubscribe form queue:', watchq.name, ', ctag =', ok.consumerTag)
            watchq.unsubscribe(ok.consumerTag);
        })
    })

});

Сообщения сервера console.log: (visit#3 и leave#3 не синхронизированы)

declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.6418165327049792 //<-- visit#1
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.6418165327049792 //<--leave#1
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.455362161854282 //<-- visit#2
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.455362161854282 //<-- leave#2
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.455362161854282 //<-- leave#3
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.4509762797970325 //<-- visit#3

person klode    schedule 27.01.2014    source источник


Ответы (1)


У нас очень похожая конфигурация, как у вас. Мы создаем анонимную эксклюзивную очередь со сроком действия, если она не используется. Анонимные очереди получают уникальное имя, сгенерированное для них брокером. Эксклюзивные очереди удаляются, как только клиент отключается (как только канал разрывается). Время истечения срока действия для очередей — это расширение RabbitMQ, но поддерживаемое amqplib, который мы используем. Я уверен, что node-amqp также поддерживает такие расширения.

Также создайте канал (но повторно используйте одно и то же соединение) для каждого сокета. Это дает однозначное соответствие между сокетом и анонимной очередью. Любые привязки к этой очереди эквивалентны привязкам для одного сокета. Из-за этого мы изначально знаем, какой сокет должен получать какие сообщения, без каких-либо специальных соглашений об именах для очередей или проверки ключей маршрутизации и т. д.

Закройте канал RabbitMQ (опять же, не соединение), когда сокет закрыт. Нет необходимости в специальном событии отказа от подписки, хотя мы можем добавить такое событие позже.

Это также означает, что один и тот же браузер может иметь несколько очередей, если в нем открыто несколько вкладок без каких-либо условий гонки.

person Emil Vikström    schedule 11.02.2014