Я пытаюсь сделать что-то, что кажется простым, но оказывается на удивление трудным.
У меня есть функция для подписки на очередь RabbitMQ. В частности, это функция Channel.consume здесь: http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
Он возвращает обещание, которое разрешается с помощью идентификатора подписки, который необходим для отказа от подписки позже, а также имеет аргумент обратного вызова, который вызывается, когда сообщения удаляются из очереди.
Когда я хочу отказаться от подписки из очереди, мне нужно отменить клиента, используя функцию Channel.cancel здесь: http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel. Это берет ранее возвращенный идентификатор подписки.
Я хочу обернуть все это в Observable, который подписывается на очередь, когда наблюдаемый подписывается, и отменяет подписку, когда наблюдаемый объект отменяется. Однако это оказывается несколько трудным из-за «двойной асинхронности» природы вызовов (я имею в виду, что они имеют как обратный вызов, так и возвращают обещание).
В идеале код, который я хотел бы написать:
return new Rx.Observable(async (subscriber) => {
var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
return async () => {
await channel.cancel(consumeResult.consumerTag);
};
});
Однако это невозможно, поскольку этот конструктор не поддерживает функции асинхронного подписчика или логику разрыва.
Я не смог понять этого. Я что-то упустил? Почему это так сложно?
Привет, Алекс