Как реализовать двухсторонний jsonrpc + скрученный сервер/клиент

Здравствуйте, я работаю над разработкой rpc-сервера на основе Twisted для обслуживания нескольких микроконтроллеров, которые вызывают вызов RPC на JSONRPC-сервер Twisted. Но приложение также требовало, чтобы сервер отправлял информацию каждому микро в любое время, поэтому вопрос заключается в том, как можно было бы сделать хорошую практику, чтобы предотвратить путаницу ответа на удаленный вызов jsonrpc от микро с запросом сервера jsonrpc, который сделан для Пользователь.

Следствием того, что я имею сейчас, является то, что микроконтроллеры получают неверную информацию, потому что они не знают, является ли строка netstring/json, поступающая из сокета, их ответом на предыдущее требование или новым запросом с сервера.

Вот мой код:

from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref

creds  = {'user1':'pass1','user2':'pass2','user3':'pass3'}

class arduinoRPC(jsonrpc.JSONRPC):
    def connectionMade(self):
        pass

    def jsonrpc_identify(self,username,password,mac):
        """ Each client must be authenticated just after to be connected calling this rpc """
        if creds.has_key(username):
            if creds[username] == password:
                authenticated = True
            else:
                authenticated = False
        else:
            authenticated = False

        if authenticated:
            self.factory.clients.append(self)
            self.factory.references[mac] = weakref.ref(self)
            return {'results':'Authenticated as %s'%username,'error':None}
        else:
            self.transport.loseConnection()

    def jsonrpc_sync_acq(self,data,f):
        """Save into django table data acquired from sensors and send ack to gateway"""
        if not (self in self.factory.clients):
            self.transport.loseConnection()
        print f
        return {'results':'synced %s records'%len(data),'error':'null'}

    def connectionLost(self, reason):
        """ mac address is searched and all reference to self.factory.clientes are erased """  
        for mac in self.factory.references.keys():
            if self.factory.references[mac]() == self:
                print 'Connection closed - Mac address: %s'%mac
                del self.factory.references[mac]
                self.factory.clients.remove(self)


class rpcfactory(jsonrpc.RPCFactory):
    protocol = arduinoRPC
    def __init__(self, maxLength=1024):
        self.maxLength = maxLength
        self.subHandlers = {}
        self.clients    =   []
        self.references =   {}

""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
    def __init__(self,rpcfactory):
        threading.Thread.__init__(self)
        self.rpcfactory =   rpcfactory
        """identifiers of each micro/client connected"""
        self.remoteMacList    =   ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
    def run(self):
        while True:
            time.sleep(10)
            while True:
                """ call to any of three potential micros connected """ 
                mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
                if self.rpcfactory.references.has_key(mac):
                    print 'Calling %s'%mac
                    proto   =   self.rpcfactory.references[mac]()
                    """ requesting echo from selected micro"""
                    dataToSend  = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
                    proto.transport.write(dataToSend)
                    break

factory = rpcfactory(arduinoRPC)

"""start thread caller""" 
r=asyncGatewayCalls(factory)
r.start()

reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()

person Jaime    schedule 08.12.2010    source источник


Ответы (1)


Вам нужно добавить достаточно информации к каждому сообщению, чтобы получатель мог определить, как его интерпретировать. Ваши требования очень похожи на требования AMP, поэтому вы можете либо используйте вместо этого AMP, либо используйте ту же структуру, что и AMP, для идентификации ваших сообщений. Конкретно:

  • В запросах укажите определенный ключ — например, AMP использует «_ask» для идентификации запросов. Это также дает им уникальное значение, которое дополнительно идентифицирует этот запрос на время существования соединения.
  • В ответах ставьте другой ключ — например, AMP для этого использует «_answer». Значение совпадает со значением из ключа «_ask» в запросе, для которого предназначен ответ.

Используя такой подход, вам просто нужно посмотреть, есть ли ключ «_ask» или ключ «_answer», чтобы определить, получили ли вы новый запрос или ответ на предыдущий запрос.

В отдельной теме ваш класс asyncGatewayCalls не должен быть основан на потоках. У него нет очевидной причины использовать потоки, и, таким образом, он также неправильно использует Twisted API, что приведет к неопределенному поведению. Большинство Twisted API можно использовать только в том потоке, в котором вы вызвали reactor.run. Единственным исключением является reactor.callFromThread, который вы можете использовать для отправки сообщения в поток-реактор из любого другого потока. Однако asyncGatewayCalls пытается записать в транспорт, что приведет к повреждению буфера или произвольным задержкам в отправляемых данных, или, возможно, к худшим вещам. Вместо этого вы можете написать asyncGatewayCalls так:

from twisted.internet.task import LoopingCall

class asyncGatewayCalls(object):
    def __init__(self, rpcfactory):
        self.rpcfactory = rpcfactory
        self.remoteMacList = [...]

    def run():
        self._call = LoopingCall(self._pokeMicro)
        return self._call.start(10)

    def _pokeMicro(self):
        while True:
            mac = self.remoteMacList[...]
            if mac in self.rpcfactory.references:
                proto = ...
                dataToSend = ...
                proto.transport.write(dataToSend)
                break

factory = ...
r = asyncGatewayCalls(factory)
r.run()

reactor.listenTCP(7080, factory)
reactor.run()

Это дает вам однопоточное решение, которое должно вести себя так же, как вы предполагали для исходного класса asyncGatewayCalls. Однако вместо того, чтобы засыпать в цикле в потоке для планирования вызовов, он использует API-интерфейсы планирования реактора (через класс LoopingCall более высокого уровня, который планирует повторные вызовы), чтобы убедиться, что _pokeMicro вызывается каждые десять секунд. .

person Jean-Paul Calderone    schedule 08.12.2010
comment
Да, вы правы, несколько часов назад я сделал такой же вывод, прочитав документацию по API потока (task.LoopingCall). Я проверил, и это сработало довольно хорошо. спасибо за помощь - person Jaime; 10.12.2010
comment
Разве поле jsonrpc 'id' не подходит для определения того, какой ответ для какого запроса? - person Andy Grover; 21.04.2012
comment
Сообщения JSON-RPC 2.0 можно четко различить. Уведомления - запросы без поля id, ответы тоже не те (результат и ошибка). - person Dr.eel; 13.04.2017