Я пытаюсь передавать большие данные через веб-сокеты, используя RPC crossbar/autobahn. Моя установка выглядит следующим образом:
- Питон 2.7
- Ригельный роутер (версия 17.8.1.post1)
- Серверная часть, которая попытается отправить большой кадр данных pandas в виде строки json.
- Внешний интерфейс, который захочет получить эту строку
По сути, мой интерфейс пытается вызвать функцию, которая вернет большую строку.
class MyComponent(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("session ready")
try:
res = yield self.call(u'data.get')
И я получаю эту ошибку:
2017-08-09T16:38:10+0200 session closed with reason wamp.close.transport_lost [WAMP transport was lost without closing the session before]
2017-08-09T16:38:10+0200 Cancelling 1 outstanding requests
2017-08-09T16:38:10+0200 call error: ApplicationError(error=<wamp.close.transport_lost>, args=[u'WAMP transport was lost without closing the session before'], kwargs={}, enc_algo=None)
Похоже, что crossbar выгоняет меня, потому что мой клиентский сеанс выглядит для него мертвым, но я думал, что автобан раздробит мои данные и что вызов не заблокирует клиентский реактор.
Я включил несколько вещей в своей конфигурации кроссбара, чтобы улучшить обработку веб-сокетов; благодаря этому я смог передавать больший объем данных, но в конечном итоге я достиг предела (файл конфигурации в значительной степени скопирован и вставлен из sam & max).
"options": {
"enable_webstatus": false,
"max_frame_size": 16777216,
"auto_fragment_size": 65536,
"fail_by_drop": true,
"open_handshake_timeout": 2500,
"close_handshake_timeout": 1000,
"auto_ping_interval": 10000,
"auto_ping_timeout": 5000,
"auto_ping_size": 4,
"compression": {
"deflate": {
"request_no_context_takeover": false,
"request_max_window_bits": 11,
"no_context_takeover": false,
"max_window_bits": 11,
"memory_level": 4
}
}
}
Есть идеи, дубли, что я делаю не так?
Спасибо,
Код клиента:
from __future__ import print_function
import pandas as pd
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks
class MyComponent(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("session ready")
try:
res = yield self.call(u'data.get')
print('Got the data')
data = pd.read_json(res)
print("call result: {}".format(data.head()))
print("call result shape: {0}, {1}".format(*data.shape))
except Exception as e:
print("call error: {0}".format(e))
if __name__ == "__main__":
from autobahn.twisted.wamp import ApplicationRunner
runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1")
runner.run(MyComponent)
Внутренний код
from __future__ import absolute_import, division, print_function
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet import reactor, defer, threads
# Imports
import pandas as pd
def get_data():
"""Returns a DataFrame of stuff as a JSON
:return: str, data as a JSON string
"""
data = pd.DataFrame({
'col1': pd.np.arange(1000000),
'col2': "I'm big",
'col3': 'Like really big',
})
print("call result shape: {0}, {1}".format(*data.shape))
print(data.memory_usage().sum())
print(data.head())
return data.to_json()
class MyBackend(ApplicationSession):
def __init__(self, config):
ApplicationSession.__init__(self, config)
@inlineCallbacks
def onJoin(self, details):
# Register a procedure for remote calling
@inlineCallbacks
def async_daily_price(eqt_list):
res = yield threads.deferToThread(get_data)
defer.returnValue(res)
yield self.register(async_daily_price, u'data.get')
if __name__ == "__main__":
from autobahn.twisted.wamp import ApplicationRunner
runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1")
runner.run(MyBackend)
Конфигурация
{
"version": 2,
"controller": {},
"workers": [
{
"type": "router",
"realms": [
{
"name": "realm1",
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": true,
"register": true,
"publish": true,
"subscribe": true
},
"disclose": {
"caller": false,
"publisher": false
},
"cache": true
}
]
}
]
}
],
"transports": [
{
"type": "universal",
"endpoint": {
"type": "tcp",
"port": 8080
},
"rawsocket": {
},
"websocket": {
"ws": {
"type": "websocket",
"options": {
"enable_webstatus": false,
"max_frame_size": 16777216,
"auto_fragment_size": 65536,
"fail_by_drop": true,
"open_handshake_timeout": 2500,
"close_handshake_timeout": 1000,
"auto_ping_interval": 10000,
"auto_ping_timeout": 5000,
"auto_ping_size": 4,
"compression": {
"deflate": {
"request_no_context_takeover": false,
"request_max_window_bits": 11,
"no_context_takeover": false,
"max_window_bits": 11,
"memory_level": 4
}
}
}
}
},
"web": {
"paths": {
"/": {
"type": "static",
}
}
}
}
]
}
]
}