дамп потока реактора lmax

Попытка понять ниже (частично) трассировку стека. Использование реактора весеннего проекта 2.0.4.

У меня есть потоки планировщика задач, которые должны создавать задачи и распределять задачи для рабочих потоков. В этот момент приложение зависает (рабочие потоки ничего не регистрируют, дампы потоков, сделанные в разное время, всегда одинаковы и т. д.).

Могу ли я заявить, что кольцевой буфер заполнен, основываясь на строке :at reactor.jarjar.com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)

если кольцевой буфер заполнен, почему мои рабочие потоки не выполняют ни одну из этих задач? Может ли состояние кольцевого буфера испортиться?

    "task-scheduler-9" prio=10 tid=0x00007f2e78aa7000 nid=0x3a7a waiting on condition [0x00007f2e651b6000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:349)
        at reactor.jarjar.com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
        at reactor.jarjar.com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
        at reactor.jarjar.com.lmax.disruptor.RingBuffer.next(RingBuffer.java:246)
        at reactor.core.dispatch.WorkQueueDispatcher.allocateTask(WorkQueueDispatcher.java:172)
        at reactor.core.dispatch.AbstractLifecycleDispatcher.dispatch(AbstractLifecycleDispatcher.java:117)
        at reactor.core.dispatch.AbstractLifecycleDispatcher.execute(AbstractLifecycleDispatcher.java:133)
        at reactor.spring.core.task.AbstractAsyncTaskExecutor.execute(AbstractAsyncTaskExecutor.java:293)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)


"pollerExecutor-19" daemon prio=10 tid=0x00007f2e78ba2000 nid=0x3a6f waiting on condition [0x00007f2e65bc0000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007016bd818> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at reactor.jarjar.com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:45)
        at reactor.jarjar.com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:55)
        at reactor.jarjar.com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)

   Locked ownable synchronizers:
        - <0x00000007011e7058> (a java.util.concurrent.ThreadPoolExecutor$Worker)

person fitifiti    schedule 24.02.2016    source источник


Ответы (1)


Если вы посмотрите на MultiProducerSequencer.java # 136 мы видим точку переноса кольцевого буфера > последовательность стробирования. Disruptor: Gating Sequence говорит, что все дело в том, что издатели перезаписывают события, которые не были обработаны. Как вы сказали, почему мои рабочие потоки не выполняют ни одну из этих задач? Я думаю, вам нужно взглянуть на ваш onEvent код, который реализует ваш EventHandler<T>

person rupweb    schedule 03.03.2016
comment
спасибо за ответ. Я могу неправильно понять ваш ответ. Издатели перезаписывают события: действительно ли это происходит? то есть события перезаписываются, если они не обрабатываются вовремя? Я бы подумал, что издатели будут заблокированы и будут просто ждать обработки событий. - person fitifiti; 11.03.2016
comment
Посмотрите на дизайн кругового буфера. Все дело в том, что нет никакой блокировки, НО указатели чтения/записи обрабатываются материалом последовательности стробирования. Я предполагаю, что Disruptor можно настроить на перезапись, даже если указатель чтения не сдвинулся... верно? Дело в том, что в вашем случае похоже, что у вас проблема с обработчиком событий потребителя, а не проблема издателя. Это для вас, чтобы исследовать. Как вы говорите: почему мои рабочие потоки (потребители) не выполняют ни одну из этих задач? Я не знаю... - person rupweb; 15.03.2016
comment
Я использую стороннюю библиотеку под названием Reactor, которая использует lmax, поэтому у меня нет слишком большого контроля/знаний о том, что происходит за кулисами. Думаю, мне придется еще немного почитать. Спасибо за указатели. - person fitifiti; 16.03.2016