Отправка сообщений RabbitMQ через веб-сокеты

Ищу несколько примеров кода для решения этой проблемы: -

Хотел бы написать код (Python или Javascript), который действовал бы как подписчик на очередь RabbitMQ, чтобы при получении сообщения он транслировал сообщение через веб-сокеты любому подключенному клиенту.

Я просмотрел Autobahn и node.js (используя "amqp" и "ws "), но не может заставить все работать должным образом. Вот код сервера в javascript с использованием node.js: -

var amqp = require('amqp');
var WebSocketServer = require('ws').Server

var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});

wss.on('connection',function(ws){

    ws.on('open', function() {
        console.log('connected');
        ws.send(Date.now().toString());
    });

    ws.on('message',function(message){
            console.log('Received: %s',message);
            ws.send(Date.now().toString());
    });
});

connection.on('ready', function(){
    connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
            console.log(' [*] Waiting for messages. To exit press CTRL+C')
            queue.subscribe(function(msg){
                    console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
                    payload = msg.data.toString('utf-8');
                    // HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
            });
    });
});

Используя этот код, я могу успешно подписаться на очередь в Rabbit и получать любые сообщения, отправленные в очередь. Точно так же я могу подключить к серверу клиент websocket (например, браузер) и отправлять / получать сообщения. НО ... как я могу отправить полезную нагрузку сообщения очереди Rabbit в виде сообщения веб-сокета в указанной точке ("КАК ЭТО СЕЙЧАС ПОСЫЛАЕТСЯ ЧЕРЕЗ ВЕБ-РОКЕТЫ")? Я думаю, это как-то связано с тем, что они застряли в неправильном обратном вызове или их нужно как-то вложить ...?

В качестве альтернативы, если бы это можно было сделать проще в Python (через Autobahn и pika), это было бы здорово.

Спасибо !


person bzo    schedule 04.04.2014    source источник
comment
Это проблема, если веб-страницы обращаются напрямую к RabbitMQ.? потому что более быстрый способ: rabbitmq.com/web-stomp.html   -  person Gabriele Santomaggio    schedule 04.04.2014
comment
Спасибо за предложение. Я видел этот плагин, но я не хочу, чтобы веб-страницы напрямую обращались к сообщениям MQ, потому что их нужно обработать, прежде чем они станут доступными для веб-страницы.   -  person bzo    schedule 06.04.2014


Ответы (1)


Один из способов реализовать вашу систему - использовать python с tornado.

Вот сервер:

    import tornado.ioloop
    import tornado.web
    import tornado.websocket
    import os
    import pika
    from threading import Thread


    clients = []

    def threaded_rmq():
        connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"));
        print 'Connected:localhost'
        channel = connection.channel()
        channel.queue_declare(queue="my_queue")
        print 'Consumer ready, on my_queue'
        channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) 
        channel.start_consuming()


    def consumer_callback(ch, method, properties, body):
            print " [x] Received %r" % (body,)
            for itm in clients:
                itm.write_message(body)

    class SocketHandler(tornado.websocket.WebSocketHandler):
        def open(self):
            print "WebSocket opened"
            clients.append(self)
        def on_message(self, message):
            self.write_message(u"You said: " + message)

        def on_close(self):
            print "WebSocket closed"
            clients.remove(self)


    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            print "get page"
            self.render("websocket.html")


application = tornado.web.Application([
    (r'/ws', SocketHandler),
    (r"/", MainHandler),
])

if __name__ == "__main__":
    thread = Thread(target = threaded_rmq)
    thread.start()

    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

и вот html-страница:

<html>
<head>
    <script src="//code.jquery.com/jquery-1.11.0.min.js"></script>
    <script>

    $(document).ready(function() {
      var ws;
       if ('WebSocket' in window) {
           ws = new WebSocket('ws://localhost:8889/ws');
        }
        else if ('MozWebSocket' in window) {
            ws = new MozWebSocket('ws://localhost:8889/ws');
        }
        else {

              alert("<tr><td> your browser doesn't support web socket </td></tr>");

            return;
        }

      ws.onopen = function(evt) { alert("Connection open ...")};

      ws.onmessage = function(evt){
        alert(evt.data);
      };

      function closeConnect(){
          ws.close();
      }


  });
    </script>

</head>

<html>

Итак, когда вы публикуете сообщение в «my_queue», оно перенаправляется на все подключенные веб-страницы.

Я надеюсь это может быть полезно

РЕДАКТИРОВАТЬ**

Здесь https://github.com/Gsantomaggio/rabbitmqexample вы можете найти полный пример.

person Gabriele Santomaggio    schedule 06.04.2014
comment
Отличный ответ - именно то, что мне нужно. Спасибо ! - person bzo; 07.04.2014
comment
Добро пожаловать. Это всего лишь скелет, который вам нужно обработать для закрытого приложения, теперь он не закрывает соединение с rabbitmq! - person Gabriele Santomaggio; 07.04.2014