Возврат больших данных из RPC (Crossbar + Autobahn|Python)

Я пытаюсь передавать большие данные через веб-сокеты, используя RPC crossbar/autobahn. Моя установка выглядит следующим образом:

  • Питон 2.7
  • Ригельный роутер (версия 17.8.1.post1)
  • Серверная часть, которая попытается отправить большой кадр данных pandas в виде строки json.
  • Внешний интерфейс, который захочет получить эту строку

По сути, мой интерфейс пытается вызвать функцию, которая вернет большую строку.

class MyComponent(ApplicationSession):

@inlineCallbacks
def onJoin(self, details):
    print("session ready")
    try:
        res = yield self.call(u'data.get')

И я получаю эту ошибку:

2017-08-09T16:38:10+0200 session closed with reason wamp.close.transport_lost [WAMP transport was lost without closing the session before]
2017-08-09T16:38:10+0200 Cancelling 1 outstanding requests
2017-08-09T16:38:10+0200 call error: ApplicationError(error=<wamp.close.transport_lost>, args=[u'WAMP transport was lost without closing the session before'], kwargs={}, enc_algo=None)

Похоже, что crossbar выгоняет меня, потому что мой клиентский сеанс выглядит для него мертвым, но я думал, что автобан раздробит мои данные и что вызов не заблокирует клиентский реактор.

Я включил несколько вещей в своей конфигурации кроссбара, чтобы улучшить обработку веб-сокетов; благодаря этому я смог передавать больший объем данных, но в конечном итоге я достиг предела (файл конфигурации в значительной степени скопирован и вставлен из sam & max).

                        "options": {
                            "enable_webstatus": false,
                            "max_frame_size": 16777216,
                            "auto_fragment_size": 65536,
                            "fail_by_drop": true,
                            "open_handshake_timeout": 2500,
                            "close_handshake_timeout": 1000,
                            "auto_ping_interval": 10000,
                            "auto_ping_timeout": 5000,
                            "auto_ping_size": 4,
                            "compression": {
                                "deflate": {
                                    "request_no_context_takeover": false,
                                    "request_max_window_bits": 11,
                                    "no_context_takeover": false,
                                    "max_window_bits": 11,
                                    "memory_level": 4
                               }
                            }
                        }

Есть идеи, дубли, что я делаю не так?

Спасибо,


Код клиента:

from __future__ import print_function
import pandas as pd

from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks


class MyComponent(ApplicationSession):

    @inlineCallbacks
    def onJoin(self, details):
        print("session ready")
        try:
            res = yield self.call(u'data.get')
            print('Got the data')
            data = pd.read_json(res)
            print("call result: {}".format(data.head()))
            print("call result shape: {0}, {1}".format(*data.shape))
        except Exception as e:
            print("call error: {0}".format(e))


if __name__ == "__main__":
    from autobahn.twisted.wamp import ApplicationRunner

    runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1")
    runner.run(MyComponent)

Внутренний код

from __future__ import absolute_import, division, print_function

from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet import reactor, defer, threads

# Imports
import pandas as pd


def get_data():
    """Returns a DataFrame of stuff as a JSON

    :return: str, data as a JSON string
    """
    data = pd.DataFrame({
        'col1': pd.np.arange(1000000),
        'col2': "I'm big",
        'col3': 'Like really big',
    })
    print("call result shape: {0}, {1}".format(*data.shape))
    print(data.memory_usage().sum())
    print(data.head())
    return data.to_json()


class MyBackend(ApplicationSession):

    def __init__(self, config):
        ApplicationSession.__init__(self, config)

    @inlineCallbacks
    def onJoin(self, details):

        # Register a procedure for remote calling
        @inlineCallbacks
        def async_daily_price(eqt_list):
            res = yield threads.deferToThread(get_data)
            defer.returnValue(res)

        yield self.register(async_daily_price, u'data.get')


if __name__ == "__main__":
    from autobahn.twisted.wamp import ApplicationRunner

    runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1")
    runner.run(MyBackend)

Конфигурация

{
"version": 2,
"controller": {},
"workers": [
    {
        "type": "router",
        "realms": [
            {
                "name": "realm1",
                "roles": [
                    {
                        "name": "anonymous",
                        "permissions": [
                            {
                                "uri": "",
                                "match": "prefix",
                                "allow": {
                                    "call": true,
                                    "register": true,
                                    "publish": true,
                                    "subscribe": true
                                },
                                "disclose": {
                                    "caller": false,
                                    "publisher": false
                                },
                                "cache": true
                            }
                        ]
                    }
                ]
            }
        ],
        "transports": [
            {
                "type": "universal",
                "endpoint": {
                    "type": "tcp",
                    "port": 8080
                },
                "rawsocket": {
                },
                "websocket": {
                    "ws": {
                        "type": "websocket",
                        "options": {
                            "enable_webstatus": false,
                            "max_frame_size": 16777216,
                            "auto_fragment_size": 65536,
                            "fail_by_drop": true,
                            "open_handshake_timeout": 2500,
                            "close_handshake_timeout": 1000,
                            "auto_ping_interval": 10000,
                            "auto_ping_timeout": 5000,
                            "auto_ping_size": 4,
                            "compression": {
                                "deflate": {
                                    "request_no_context_takeover": false,
                                    "request_max_window_bits": 11,
                                    "no_context_takeover": false,
                                    "max_window_bits": 11,
                                    "memory_level": 4
                               }
                            }
                        }
                    }
                },
                "web": {
                    "paths": {
                        "/": {
                            "type": "static",
                            }
                        }
                    }
                }
            ]
        }
    ]
}

person A. Ciclet    schedule 09.08.2017    source источник


Ответы (1)


Решение, предложенное группой crossbar.io, заключалось в использовании опции прогрессивного результата RPC.

Полный рабочий пример находится по адресу https://github.com/crossbario/autobahn-python/tree/master/examples/twisted/wamp/rpc/progress

В моем коде мне пришлось добавить фрагментацию результата в бэкэнд.

        step = 10000
        if details.progress and len(res) > step:
            for i in xrange(0, len(res), step):
                details.progress(res[i:i+step])
        else:
            defer.returnValue(res)

И к звонящему

        res = yield self.call(
            u'data.get'
            options=CallOptions(
                on_progress=partial(on_progress, res=res_list)
            )
        )

Где моя функция on_progress добавляет куски в список результатов

def on_progress(x, res):
    res.append(x)

Выбор правильного размера куска сделает свое дело.

person A. Ciclet    schedule 10.08.2017