Проверьте, не выдает ли какой-либо подписчик исключение в RxJava.

Я пытаюсь создать реактивный слушатель RabbitMQ, который позволяет нам обрабатывать каждое сообщение с несколькими подписчиками. Мы хотим только ack отправить сообщение, когда все подписчики закончат успешно.

Это моя текущая установка:

Observable
    .fromCallable(() -> {
        // Set up connection
        return consumer;
    })
    .flatMap(consumer -> Observable
        .fromCallable(consumer::nextDelivery)
        .doOnError(throwable -> {
            try {
                consumer.getChannel().getConnection().close();
            } catch (IOException ignored) { }
        })
        .repeat())
    .retryWhen(observable -> observable.delay(3, TimeUnit.SECONDS))
    .publish()
    .refCount();

Это установит соединение один раз, отправит все сообщения всем подписчикам и повторно подключится через 3 секунды, если это где-то не сработает, например, из-за. Кролик становится недоступным.

Что мне еще нужно сделать, так это ack или nack сообщение. Поскольку все наши обработчики сообщений являются идемпотентными, я могу просто повторно поставить сообщение в очередь, если какой-либо обработчик выйдет из строя, чтобы гарантировать успешное завершение каждого обработчика.

Есть ли способ узнать, не сработал ли какой-либо подписчик? В настоящее время я думал о подписке на что-то вроде этого:

public void subscribe(Action1 action) {
    deliveries
        .flatMap(delivery -> Observable
            .just(delivery)
            .doOnNext(action)
            .doOnError(throwable -> {
                // nack
            })
            .doOnCompleted(() -> {
                // ack
            })
        )
        .subscribe();
}

Но это, очевидно, ackс или nackс при первой неудаче или успехе. Есть ли способ merge всех подписчиков определенного сообщения, а затем проверить наличие ошибок или завершения?

Я также пробовал что-то вроде использования AtomicInteger для подсчета всех подписчиков, а затем подсчета успехов/неуспехов, но очевидно, что всякий раз, когда кто-то подписывается или отказывается от подписки во время обработки, нет тривиального способа синхронизации без блокировки всего этапа обработки.

Я также мог бы дать каждому подписчику Observable<Delivery> и заставить их возвращать это с ошибкой или завершением, подобно retryWhen (как своего рода ответный канал), но у меня нет возможности заранее сгенерировать необходимое количество наблюдаемых и объединить их потом.

Любые идеи? Спасибо за чтение!


person Kamiel Wanrooij    schedule 18.08.2016    source источник


Ответы (2)


Вы можете использовать onErrorResumeNext для управления исключением, распространяемым из вашего конвейера, и установить nack, а затем использовать onComplete как ack

Вот пример

Observable.just(null).map(Object::toString)
                     .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                     .retryWhen(errors -> errors.doOnNext(o -> count++)
                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                     .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world!");
                                          })
                     .subscribe(s -> System.out.println(s));
person paul    schedule 18.08.2016
comment
Спасибо. Учитывает ли это потенциальные ошибки от нескольких подписчиков? - person Kamiel Wanrooij; 18.08.2016
comment
Я действительно не понимаю, как я мог бы применить это. Я поставил onErrorResumeNext до или после refCount? И как я узнаю, что все подписчики успешно обработали сообщение, чтобы его ack? Я не хочу, чтобы подписчики закрывались, потому что от кролика будут приходить последующие сообщения. - person Kamiel Wanrooij; 18.08.2016

Вы хотите использовать Observable.mergeDelayError, а затем один из .onError* методов.

Первый будет распространять ошибку только после того, как все наблюдаемые будут завершены/ошибки; второй позволит вам обработать ошибку после завершения обработки.

Изменить: чтобы получить подсчет, подсчитайте успехи:

Object message = ...;
List<Action1<?>> actions = ...;
Observable.from(actions)
 .map(action ->
    Observable.defer(() -> Observable.just(action.call(message))
    .ignoreEmements()
    .cast(Integer.class)
    .switchIfEmpty(1)
    .onErrorReturnResumeNext(e->Observable.empty())
 )
 .compose(Observable::merge)
 .count();

Это немного запутано, и его можно было бы сделать более понятным: совершать звонки, игнорировать ошибки от них, считать успехи.

person Tassos Bassoukos    schedule 18.08.2016
comment
Да, это звучит как правильно. Я все еще пытаюсь найти способ получить правильное количество наблюдаемых. Как узнать, сколько подписчиков получит сообщение? Только refCount имеет точный номер подписчика AFAIK, и это блокирует каждое сообщение для этого. - person Kamiel Wanrooij; 19.08.2016
comment
Спасибо за редактирование. Если кто-то подпишется во время обработки потока, он не получит сообщение, но подсчет будет включать его и будет не синхронизирован? У меня нет списка подписчиков, я позволяю RxJava обрабатывать это, но, глядя на ваш пример, мне, возможно, придется самому отслеживать подписчиков и явно отправлять их, чтобы это работало. - person Kamiel Wanrooij; 19.08.2016