RxJS Observable с функцией асинхронного подписчика

Я пытаюсь сделать что-то, что кажется простым, но оказывается на удивление трудным.

У меня есть функция для подписки на очередь RabbitMQ. В частности, это функция Channel.consume здесь: http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

Он возвращает обещание, которое разрешается с помощью идентификатора подписки, который необходим для отказа от подписки позже, а также имеет аргумент обратного вызова, который вызывается, когда сообщения удаляются из очереди.

Когда я хочу отказаться от подписки из очереди, мне нужно отменить клиента, используя функцию Channel.cancel здесь: http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel. Это берет ранее возвращенный идентификатор подписки.

Я хочу обернуть все это в Observable, который подписывается на очередь, когда наблюдаемый подписывается, и отменяет подписку, когда наблюдаемый объект отменяется. Однако это оказывается несколько трудным из-за «двойной асинхронности» природы вызовов (я имею в виду, что они имеют как обратный вызов, так и возвращают обещание).

В идеале код, который я хотел бы написать:

return new Rx.Observable(async (subscriber) => {
  var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
  return async () => {
    await channel.cancel(consumeResult.consumerTag);
  };
});

Однако это невозможно, поскольку этот конструктор не поддерживает функции асинхронного подписчика или логику разрыва.

Я не смог понять этого. Я что-то упустил? Почему это так сложно?

Привет, Алекс


person AlexC    schedule 05.04.2017    source источник


Ответы (1)


Созданному наблюдаемому не нужно ждать разрешения channel.consume обещания, поскольку наблюдатель (это переданный наблюдатель, а не подписчик) вызывается только из предоставленной вами функции.

Однако возвращаемая вами функция отмены подписки должна будет дождаться выполнения этого обещания. И он может делать это внутри, например:

return new Rx.Observable((observer) => {
  var consumeResult = channel.consume(queueName, (message) => observer.next(message));
  return () => {
    consumeResult.then(() => channel.cancel(consumeResult.consumerTag));
  };
});
person cartant    schedule 05.04.2017
comment
Спасибо за ваш ответ. Я также рассмотрел ваше предложение перед тем, как спросить, но моя проблема в том, что ничего не ждет, пока обещание channel.cancel разрешится. Итак, допустим, вызов channel.cancel разрешается только через 3 секунды. Существует вероятность того, что по этому каналу будут получены новые сообщения, но Rx-наблюдатель уже отписался, и поэтому эти сообщения будут потеряны в эфире. Этого я пытаюсь избежать. Есть ли у вас предложения по решению этой проблемы? - person AlexC; 06.04.2017
comment
Я не понимаю, почему это проблема. Согласно разделу Подписка и отказ от подписки в наблюдаемом контракте: < i> Когда наблюдатель отправляет уведомление об отказе от подписки для Observable, Observable будет пытаться прекратить отправку уведомлений наблюдателю. Однако не гарантируется, что Observable не отправит никаких уведомлений наблюдателю после того, как наблюдатель отправит ему уведомление об отказе от подписки. - person cartant; 06.04.2017
comment
Поэтому, если канал продолжает перекачивать сообщения, пока не разрешится отмена, наблюдатель должен их получить. То есть реализация наблюдателя не должна ожидать, что никаких дальнейших сообщений будет отправлено. - person cartant; 06.04.2017
comment
Интересно. Спасибо, что указали на это, я думаю, это был тот момент, который я упустил! - person AlexC; 06.04.2017