Исключение BackPressure в коде RxJava даже после применения надлежащего оператора

Я использую RxJava в своем приложении для Android. Я использую таймер с помощью функции interval(), но получаю исключение Missing Backpressure, хотя я добавил onBackPressureDrop(). Я также добавил onError() к моему подписчику и регистрирую исключение в Crashlytics, но оно все равно падает. Пожалуйста помоги. Я потратил неделю на выяснение проблемы, но безрезультатно. Код время от времени дает сбой, и я не смог воспроизвести его ни разу. TraceonError взят непосредственно из официального выпуска rxjava git. Я думаю, что это тот, кто отвечает за печать исключения Crashlytics, хотя, возможно, я ошибаюсь.

Моя задержка Наблюдаемая -

serviceStartSubscription = Observable.just(1)
                .delay(delay, TimeUnit.SECONDS)
                .onBackpressureDrop()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable throwable) {
                        Log.e("Service", "Service not started");
                        Crashlytics.logException(throwable);
                        Crashlytics.log("Error while starting service in Orders Home class");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        iOmsOrdersHomeScreenActivity.startService();
                    }
                });

Мой наблюдаемый -

public static Subscription createTimer(long expiryTime, @NonNull Subscriber<Long> subscriber) {
    return Observable.interval(1, 10, TimeUnit.SECONDS)
            .onBackpressureDrop()
            .lift(traceOnError())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(subscriber);
}

трассировкаОшибка()-

public class OperatorTraceOnError<T> implements Observable.Operator<T, T> {

    private final StackTraceElement[] trace = new Throwable().getStackTrace();

    public static <T> OperatorTraceOnError<T> traceOnError() {
        return new OperatorTraceOnError<>();
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onCompleted() {
                child.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
                child.onError(new TracedException(throwable, trace));
            }

            @Override
            public void onNext(T subscriber) {
                child.onNext(subscriber);
            }
        };

        child.add(parent);
        return parent;
    }

    private static class TracedException extends RuntimeException {
        public TracedException(Throwable throwable, StackTraceElement[] trace) {
            super(throwable);
            setStackTrace(trace);
        }
    }
}

Крушение -

Caused by rx.exceptions.OnErrorNotImplementedException
rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call (InternalObservableUtils.java:386)
rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call (InternalObservableUtils.java:383)
rx.internal.util.ActionSubscriber.onError (ActionSubscriber.java:44)
rx.observers.SafeSubscriber._onError (SafeSubscriber.java:157)
rx.observers.SafeSubscriber.onError (SafeSubscriber.java:120)
rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated (OperatorObserveOn.java:276)
rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call (OperatorObserveOn.java:219)
rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
android.os.Handler.handleCallback (Handler.java:733)
dalvik.system.NativeStart.main (NativeStart.java)
Caused by rx.exceptions.MissingBackpressureException
rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext (OperatorObserveOn.java:162)
rx.internal.operators.OperatorSubscribeOn$1$1.onNext (OperatorSubscribeOn.java:53)
rx.internal.operators.OnSubscribeTimerPeriodically$1.call (OnSubscribeTimerPeriodically.java:52)
rx.Scheduler$Worker$1.call (Scheduler.java:134)
rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call (EventLoopsScheduler.java:187)
rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:422)
java.lang.Thread.run (Thread.java:841)
#0
MarketingHandler
 Raw Text
android.os.MessageQueue.nativePollOnce (MessageQueue.java)
android.os.HandlerThread.run (HandlerThread.java:61)
#1
FirebaseWorker
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#2
AsyncTask #3
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#3
RxScheduledExecutorPool-1
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#4
OkHttp ConnectionPool
 Raw Text
java.lang.Object.wait (Object.java)
okhttp3.ConnectionPool$1.run (ConnectionPool.java:65)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1112)
java.lang.Thread.run (Thread.java:841)
#5
Binder_2
 Raw Text
dalvik.system.NativeStart.run (NativeStart.java)
#6
ManifestHandler
 Raw Text
android.os.MessageQueue.nativePollOnce (MessageQueue.java)
android.os.HandlerThread.run (HandlerThread.java:61)
#7
RxIoScheduler-10
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#8
RxIoScheduler-11
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#9
AsyncTask #2
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#10
Okio Watchdog
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Object.wait (Object.java:364)
okio.AsyncTimeout.awaitTimeout (AsyncTimeout.java:311)
okio.AsyncTimeout.access$000 (AsyncTimeout.java:40)
okio.AsyncTimeout$Watchdog.run (AsyncTimeout.java:286)
#11
RxComputationScheduler-2
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#12
AnalyticsHandler
 Raw Text
android.os.MessageQueue.nativePollOnce (MessageQueue.java)
android.os.HandlerThread.run (HandlerThread.java:61)
#13
Crashlytics Exception Handler1
 Raw Text
dalvik.system.VMStack.getThreadStackTrace (VMStack.java)
java.lang.Thread.getAllStackTraces (Thread.java:521)
com.crashlytics.android.core.CrashlyticsUncaughtExceptionHandler.writeSessionEvent (CrashlyticsUncaughtExceptionHandler.java:1009)
com.crashlytics.android.core.CrashlyticsUncaughtExceptionHandler.writeFatal (CrashlyticsUncaughtExceptionHandler.java:766)
com.crashlytics.android.core.CrashlyticsUncaughtExceptionHandler.handleUncaughtException (CrashlyticsUncaughtExceptionHandler.java:262)
com.crashlytics.android.core.CrashlyticsUncaughtExceptionHandler.access$100 (CrashlyticsUncaughtExceptionHandler.java:55)
com.crashlytics.android.core.CrashlyticsUncaughtExceptionHandler$5.call (CrashlyticsUncaughtExceptionHandler.java:238)
com.crashlytics.android.core.CrashlyticsUncaughtExceptionHandler$5.call (CrashlyticsUncaughtExceptionHandler.java:235)
java.util.concurrent.FutureTask.run (FutureTask.java:237)
java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:587)
io.fabric.sdk.android.services.common.ExecutorUtils$1$1.onRun (ExecutorUtils.java:75)
io.fabric.sdk.android.services.common.BackgroundPriorityRunnable.run (BackgroundPriorityRunnable.java:30)
java.lang.Thread.run (Thread.java:841)
#14
RxComputationScheduler-4
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#15
RxComputationScheduler-1
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#16
RxSchedulerPurge-1
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#17
RxIoScheduler-3
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#18
RxIoScheduler-2
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#19
Binder_1
 Raw Text
dalvik.system.NativeStart.run (NativeStart.java)
#20
main
 Raw Text
java.lang.Object.wait (Object.java)
java.util.concurrent.FutureTask.get (FutureTask.java:174)
com.crashlytics.android.core.CrashlyticsExecutorServiceWrapper.executeSyncLoggingException (CrashlyticsExecutorServiceWrapper.java:44)
com.crashlytics.android.core.CrashlyticsUncaughtExceptionHandler.uncaughtException (CrashlyticsUncaughtExceptionHandler.java:235)
java.lang.ThreadGroup.uncaughtException (ThreadGroup.java:693)
java.lang.ThreadGroup.uncaughtException (ThreadGroup.java:690)
rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:66)
android.os.Handler.handleCallback (Handler.java:733)
dalvik.system.NativeStart.main (NativeStart.java)
#21
ProfileHandler
 Raw Text
android.os.MessageQueue.nativePollOnce (MessageQueue.java)
android.os.HandlerThread.run (HandlerThread.java:61)
#22
RxIoScheduler-12
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#23
pool-4-thread-1
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#24
fifo-pool-thread-0
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
com.bumptech.glide.load.engine.executor.FifoPriorityThreadPoolExecutor$DefaultThreadFactory$1.run (FifoPriorityThreadPoolExecutor.java:118)
#25
RxIoScheduler-9
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#26
Queue
 Raw Text
java.lang.Object.wait (Object.java)
java.util.concurrent.PriorityBlockingQueue.take (PriorityBlockingQueue.java:510)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.performOperation (DependencyPriorityBlockingQueue.java:197)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.get (DependencyPriorityBlockingQueue.java:236)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:65)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:46)
java.util.concurrent.ThreadPoolExecutor.getTask (ThreadPoolExecutor.java:1035)
java.lang.Thread.run (Thread.java:841)
#27
OkHttp ConnectionPool
 Raw Text
java.lang.Object.wait (Object.java)
okhttp3.ConnectionPool$1.run (ConnectionPool.java:65)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1112)
java.lang.Thread.run (Thread.java:841)
#28
RxScheduledExecutorPool-2
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#29
Queue
 Raw Text
java.lang.Object.wait (Object.java)
java.util.concurrent.PriorityBlockingQueue.take (PriorityBlockingQueue.java:510)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.performOperation (DependencyPriorityBlockingQueue.java:197)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.get (DependencyPriorityBlockingQueue.java:236)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:65)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:46)
java.util.concurrent.ThreadPoolExecutor.getTask (ThreadPoolExecutor.java:1035)
java.lang.Thread.run (Thread.java:841)
#30
RxIoScheduler-8
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#31
RxIoScheduler-1 (Evictor)
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#32
RxIoScheduler-5
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#33
GC
 Raw Text
dalvik.system.NativeStart.run (NativeStart.java)
#34
Queue
 Raw Text
java.lang.Object.wait (Object.java)
java.util.concurrent.PriorityBlockingQueue.take (PriorityBlockingQueue.java:510)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.performOperation (DependencyPriorityBlockingQueue.java:197)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.get (DependencyPriorityBlockingQueue.java:236)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:65)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:46)
java.util.concurrent.ThreadPoolExecutor.getTask (ThreadPoolExecutor.java:1035)
java.lang.Thread.run (Thread.java:841)
#35
AsyncTask #1
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#36
LocationHandler
 Raw Text
android.os.MessageQueue.nativePollOnce (MessageQueue.java)
android.os.HandlerThread.run (HandlerThread.java:61)
#37
ReferenceQueueDaemon
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#38
RxComputationScheduler-3
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#39
Compiler
 Raw Text
dalvik.system.NativeStart.run (NativeStart.java)
#40
TubeSockWriter-2
 Raw Text
java.lang.Object.wait (Object.java)
java.util.concurrent.LinkedBlockingQueue.take (LinkedBlockingQueue.java:410)
com.firebase.tubesock.WebSocketWriter.writeMessage (WebSocketWriter.java:126)
com.firebase.tubesock.WebSocketWriter.runWriter (WebSocketWriter.java:141)
com.firebase.tubesock.WebSocketWriter.access$000 (WebSocketWriter.java:22)
com.firebase.tubesock.WebSocketWriter$1.run (WebSocketWriter.java:36)
java.lang.Thread.run (Thread.java:841)
#41
RxIoScheduler-6
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#42
FinalizerDaemon
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#43
Signal Catcher
 Raw Text
dalvik.system.NativeStart.run (NativeStart.java)
#44
Binder_3
 Raw Text
dalvik.system.NativeStart.run (NativeStart.java)
#45
RxIoScheduler-7
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#46
Queue
 Raw Text
java.lang.Object.wait (Object.java)
java.util.concurrent.PriorityBlockingQueue.take (PriorityBlockingQueue.java:510)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.performOperation (DependencyPriorityBlockingQueue.java:197)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.get (DependencyPriorityBlockingQueue.java:236)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:65)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:46)
java.util.concurrent.ThreadPoolExecutor.getTask (ThreadPoolExecutor.java:1035)
java.lang.Thread.run (Thread.java:841)
#47
StethoListener-main
 Raw Text
android.net.LocalSocketImpl.accept (LocalSocketImpl.java)
android.net.LocalServerSocket.accept (LocalServerSocket.java:94)
com.facebook.stetho.server.LocalSocketServer.listenOnAddress (LocalSocketServer.java:85)
com.facebook.stetho.server.LocalSocketServer.run (LocalSocketServer.java:74)
com.facebook.stetho.server.ServerManager$1.run (ServerManager.java:40)
#48
Answers Events Handler1
 Raw Text
java.lang.Object.wait (Object.java)
java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:587)
io.fabric.sdk.android.services.common.ExecutorUtils$1$1.onRun (ExecutorUtils.java:75)
io.fabric.sdk.android.services.common.BackgroundPriorityRunnable.run (BackgroundPriorityRunnable.java:30)
java.lang.Thread.run (Thread.java:841)
#49
RxIoScheduler-4
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#50
AsyncTask #4
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#51
FinalizerWatchdogDaemon
 Raw Text
java.lang.Object.wait (Object.java)
java.lang.Thread.run (Thread.java:841)
#52
TubeSockReader-2
 Raw Text
com.android.org.conscrypt.NativeCrypto.SSL_read (NativeCrypto.java)
java.io.DataInputStream.readFully (DataInputStream.java:99)
com.firebase.tubesock.WebSocketReceiver.read (WebSocketReceiver.java:141)
com.firebase.tubesock.WebSocketReceiver.run (WebSocketReceiver.java:34)
com.firebase.tubesock.WebSocket.runReader (WebSocket.java:372)
com.firebase.tubesock.WebSocket.access$000 (WebSocket.java:30)
com.firebase.tubesock.WebSocket$2.run (WebSocket.java:108)
java.lang.Thread.run (Thread.java:841)
#53
Queue
 Raw Text
java.lang.Object.wait (Object.java)
java.util.concurrent.PriorityBlockingQueue.take (PriorityBlockingQueue.java:510)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.performOperation (DependencyPriorityBlockingQueue.java:197)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.get (DependencyPriorityBlockingQueue.java:236)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:65)
io.fabric.sdk.android.services.concurrency.DependencyPriorityBlockingQueue.take (DependencyPriorityBlockingQueue.java:46)
java.util.concurrent.ThreadPoolExecutor.getTask (ThreadPoolExecutor.java:1035)
java.lang.Thread.run (Thread.java:841)

person Shitij Goyal    schedule 22.09.2016    source источник
comment
Попробуйте поставить onBackPpressureDrop() после оператора lift(...).   -  person R. Zagórski    schedule 22.09.2016
comment
Можете ли вы показать нам код traceOnError? Правильное написание операторов — очень сложное дело.   -  person Dave Moten    schedule 22.09.2016
comment
Только что отредактировал свой пост. Я добавил код traceonerror, также он был взят из официального выпуска rxjava git.   -  person Shitij Goyal    schedule 22.09.2016
comment
Ваш OperatorTraceOnError неправильно формирует противодавление, но без него вы не должны видеть MissingBackpressureException. Также странно, что трассировка стека сбоя не включает никаких методов onBackpressureDrop, поэтому вы уверены, что смотрите на правильный метод?   -  person akarnokd    schedule 22.09.2016
comment
Да, на самом деле я использую только один Observable с таймером в своем приложении. Поэтому я предположил, что этот Observable был источником этой ошибки. У меня есть еще один Observable, использующий оператор delay(). Может ли это быть причиной? я добавил код в delay() Observable в своем посте. Кроме этого, я не использую никаких других Observable с некоторыми «особыми» возможностями. В основном просто некоторые функции map, take, filter и from().   -  person Shitij Goyal    schedule 22.09.2016
comment
Кроме того, если это не так, не могли бы вы предложить какой-нибудь способ выяснить, какой наблюдаемый ответственен за этот сбой?   -  person Shitij Goyal    schedule 22.09.2016


Ответы (1)


В OperatorTraceOnError вы неявно запрашиваете Long.MAX_VALUE у восходящего потока. Для прохождения запросов от дочернего подписчика вам нужно использовать другую перегрузку конструктора Subscriber:

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
    Subscriber<T> parent = new Subscriber<T>(child) {
        @Override
        public void onCompleted() {
            child.onCompleted();
        }
...
person Dave Moten    schedule 23.09.2016