Я пытаюсь создать реактивный слушатель RabbitMQ, который позволяет нам обрабатывать каждое сообщение с несколькими подписчиками. Мы хотим только ack
отправить сообщение, когда все подписчики закончат успешно.
Это моя текущая установка:
Observable
.fromCallable(() -> {
// Set up connection
return consumer;
})
.flatMap(consumer -> Observable
.fromCallable(consumer::nextDelivery)
.doOnError(throwable -> {
try {
consumer.getChannel().getConnection().close();
} catch (IOException ignored) { }
})
.repeat())
.retryWhen(observable -> observable.delay(3, TimeUnit.SECONDS))
.publish()
.refCount();
Это установит соединение один раз, отправит все сообщения всем подписчикам и повторно подключится через 3 секунды, если это где-то не сработает, например, из-за. Кролик становится недоступным.
Что мне еще нужно сделать, так это ack
или nack
сообщение. Поскольку все наши обработчики сообщений являются идемпотентными, я могу просто повторно поставить сообщение в очередь, если какой-либо обработчик выйдет из строя, чтобы гарантировать успешное завершение каждого обработчика.
Есть ли способ узнать, не сработал ли какой-либо подписчик? В настоящее время я думал о подписке на что-то вроде этого:
public void subscribe(Action1 action) {
deliveries
.flatMap(delivery -> Observable
.just(delivery)
.doOnNext(action)
.doOnError(throwable -> {
// nack
})
.doOnCompleted(() -> {
// ack
})
)
.subscribe();
}
Но это, очевидно, ack
с или nack
с при первой неудаче или успехе. Есть ли способ merge
всех подписчиков определенного сообщения, а затем проверить наличие ошибок или завершения?
Я также пробовал что-то вроде использования AtomicInteger
для подсчета всех подписчиков, а затем подсчета успехов/неуспехов, но очевидно, что всякий раз, когда кто-то подписывается или отказывается от подписки во время обработки, нет тривиального способа синхронизации без блокировки всего этапа обработки.
Я также мог бы дать каждому подписчику Observable<Delivery>
и заставить их возвращать это с ошибкой или завершением, подобно retryWhen
(как своего рода ответный канал), но у меня нет возможности заранее сгенерировать необходимое количество наблюдаемых и объединить их потом.
Любые идеи? Спасибо за чтение!