Я бы порекомендовал вам следовать первым руководствам по RabbitMQ, если вы еще этого не сделали. Пример RPC основан на концепциях, описанных в предыдущих примерах (прямые очереди, эксклюзивные очереди, подтверждения и т. д.).
Решение RPC, предложенное в руководстве, требует как минимум двух очередей, в зависимости от того, сколько клиентов вы хотите использовать:
- Одна прямая очередь (
rpc_queue
), используемая для отправки запросов от клиента к серверу.
- Одна эксклюзивная очередь для каждого клиента, используемая для получения ответов.
Цикл запроса/ответа:
- Клиент отправляет сообщение
rpc_queue
. Каждое сообщение включает свойство reply_to
с именем исключительной очереди клиента, на которую сервер должен ответить, и свойство correlation_id
, представляющее собой просто уникальный id, используемый для отслеживания запроса.
- Сервер ожидает сообщений на
rpc_queue
. Когда приходит сообщение, он подготавливает ответ, добавляет correlation_id
к новому сообщению и отправляет его в очередь, определенную в свойстве сообщения reply_to
.
- Клиент ждет в своей эксклюзивной очереди, пока не найдет сообщение с
correlation_id
, которое было изначально сгенерировано.
Переходя прямо к вашей проблеме, первое, что нужно сделать, это определить формат сообщения, который вы хотите использовать в своих ответах. Вы можете использовать JSON, msgpack или любую другую библиотеку сериализации. Например, при использовании JSON одно сообщение может выглядеть примерно так:
{
"cpu": 1.2,
"memory": 0.3
}
Затем на вашем server.py
:
def on_request(channel, method, props, body):
response = {'cpu': current_cpu_usage(),
'memory': current_memory_usage()}
properties = pika.BasicProperties(correlation_id=props.correlation_id)
channel.basic_publish(exchange='',
routing_key=props.reply_to,
properties=properties,
body=json.dumps(response))
channel.basic_ack(delivery_tag=method.delivery_tag)
# ...
И на вашем client.py
:
class ResponseTimeout(Exception): pass
class Client:
# similar constructor as `FibonacciRpcClient` from tutorial...
def on_response(self, channel, method, props, body):
if self.correlation_id == props.correlation_id:
self.response = json.loads(body.decode())
def call(self, timeout=2):
self.response = None
self.correlation_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.correlation_id),
body='')
start_time = time.time()
while self.response is None:
if (start_time + timeout) < time.time():
raise ResponseTimeout()
self.connection.process_data_events()
return self.response
Как видите, код почти такой же, как и исходный FibonacciRpcClient
. Основные отличия:
- Мы используем JSON в качестве формата данных для наших сообщений.
- Наш клиентский метод
call()
не требует аргумента body
(на сервер нечего отправлять)
- Мы позаботимся о тайм-аутах ответа (если сервер не работает или не отвечает на наши сообщения)
Тем не менее, здесь есть что улучшить:
- Нет обработки ошибок: например, если клиент «забывает» отправить очередь
reply_to
, наш сервер выйдет из строя и снова выйдет из строя при перезапуске (сломанное сообщение будет повторно помещено в очередь до тех пор, пока оно не будет подтверждено нашим сервером). )
- Мы не обрабатываем разорванные соединения (нет механизма переподключения)
- ...
Вы также можете рассмотреть возможность замены подхода RPC шаблоном публикация/подписка; Таким образом, сервер просто передает информацию о состоянии ЦП/памяти каждые X интервалов времени, и один или несколько клиентов получают обновления.
person
el.atomo
schedule
09.03.2016