Удаленный вызов Rabbitmq с Pika

Я новичок в rabbitmq и пытаюсь понять, как я могу заставить клиент запрашивать сервер с информацией об использовании памяти и ЦП с помощью этого руководства (https://www.rabbitmq.com/tutorials/tutorial-six).-python.html).

Итак, клиент запрашивает ЦП и память (думаю, мне понадобятся две очереди), а сервер отвечает значениями.

Можно ли в любом случае просто создать client.py и server.py в этом случае, используя библиотеку Pika в Python.


person Community    schedule 04.03.2016    source источник


Ответы (1)


Я бы порекомендовал вам следовать первым руководствам по RabbitMQ, если вы еще этого не сделали. Пример RPC основан на концепциях, описанных в предыдущих примерах (прямые очереди, эксклюзивные очереди, подтверждения и т. д.).

Решение RPC, предложенное в руководстве, требует как минимум двух очередей, в зависимости от того, сколько клиентов вы хотите использовать:

  • Одна прямая очередь (rpc_queue), используемая для отправки запросов от клиента к серверу.
  • Одна эксклюзивная очередь для каждого клиента, используемая для получения ответов.

Цикл запроса/ответа:

  • Клиент отправляет сообщение rpc_queue. Каждое сообщение включает свойство reply_to с именем исключительной очереди клиента, на которую сервер должен ответить, и свойство correlation_id, представляющее собой просто уникальный id, используемый для отслеживания запроса.
  • Сервер ожидает сообщений на rpc_queue. Когда приходит сообщение, он подготавливает ответ, добавляет correlation_id к новому сообщению и отправляет его в очередь, определенную в свойстве сообщения reply_to.
  • Клиент ждет в своей эксклюзивной очереди, пока не найдет сообщение с correlation_id, которое было изначально сгенерировано.

Переходя прямо к вашей проблеме, первое, что нужно сделать, это определить формат сообщения, который вы хотите использовать в своих ответах. Вы можете использовать JSON, msgpack или любую другую библиотеку сериализации. Например, при использовании JSON одно сообщение может выглядеть примерно так:

{
    "cpu": 1.2,
    "memory": 0.3
} 

Затем на вашем server.py:

def on_request(channel, method, props, body):
    response = {'cpu': current_cpu_usage(),
                'memory': current_memory_usage()}
    properties = pika.BasicProperties(correlation_id=props.correlation_id)

    channel.basic_publish(exchange='',
                          routing_key=props.reply_to,
                          properties=properties,
                          body=json.dumps(response))
    channel.basic_ack(delivery_tag=method.delivery_tag)

# ...

И на вашем client.py:

class ResponseTimeout(Exception): pass

class Client:
    # similar constructor as `FibonacciRpcClient` from tutorial...

    def on_response(self, channel, method, props, body):
        if self.correlation_id == props.correlation_id:
            self.response = json.loads(body.decode())

    def call(self, timeout=2):
        self.response = None
        self.correlation_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.correlation_id),
                                   body='')

        start_time = time.time()
        while self.response is None:
            if (start_time + timeout) < time.time():
                raise ResponseTimeout()
            self.connection.process_data_events()
        return self.response

Как видите, код почти такой же, как и исходный FibonacciRpcClient. Основные отличия:

  • Мы используем JSON в качестве формата данных для наших сообщений.
  • Наш клиентский метод call() не требует аргумента body (на сервер нечего отправлять)
  • Мы позаботимся о тайм-аутах ответа (если сервер не работает или не отвечает на наши сообщения)

Тем не менее, здесь есть что улучшить:

  • Нет обработки ошибок: например, если клиент «забывает» отправить очередь reply_to, наш сервер выйдет из строя и снова выйдет из строя при перезапуске (сломанное сообщение будет повторно помещено в очередь до тех пор, пока оно не будет подтверждено нашим сервером). )
  • Мы не обрабатываем разорванные соединения (нет механизма переподключения)
  • ...

Вы также можете рассмотреть возможность замены подхода RPC шаблоном публикация/подписка; Таким образом, сервер просто передает информацию о состоянии ЦП/памяти каждые X интервалов времени, и один или несколько клиентов получают обновления.

person el.atomo    schedule 09.03.2016