одновременное потребление коллекции с использованием реактора

время от времени мне приходится внедрять классическое решение для параллельных производителей и потребителей в рамках проекта, в котором я участвую, и проблема в значительной степени уменьшается за счет наличия некоторой коллекции, которая заполняется из нескольких потоков и потребляется несколькими потребителями. Вкратце, коллекция, скажем, ограничена 10 тыс. Сущностей, после того, как размер буфера будет достигнут, рабочая задача будет отправлена, потребляя эти 10 тыс. Сущностей, есть предел таких рабочих, говорят, что он установлен на 10, что в худшем случае означает, что я могу иметь до 10 рабочих, каждый из которых потребляет 10 тыс. сущностей.

Мне нужно поиграть с некоторой блокировкой здесь и там, некоторые проверки переполнения буфера (случай, когда производители генерируют слишком много данных, пока все рабочие заняты обработкой своих фрагментов), поэтому мне приходится отбрасывать новые события, чтобы избежать OOM (не лучшее решение, но стабильность p1;))

В последнее время искал реактор и способ использовать его вместо перехода на низкий уровень и делать все, что описано выше, поэтому глупый вопрос: «Можно ли использовать реактор для этого варианта использования?» пока забудьте о переполнении / отбрасывании .. как я могу достичь N потребителей для вещательной компании?

особенно внимательно смотрел вещатель с буфером + диспетчер пула потоков:

void test() {
  final Broadcaster<String> sink =   Broadcaster.create(Environment.initialize());
  Dispatcher dispatcher = Environment.newDispatcher(2048, 20, DispatcherType.WORK_QUEUE);

  sink
    .buffer(100)
    .consumeOn(dispatcher, this::log);

  for (int i=0; i<100000; i++) {
    sink.onNext("elementent " + i);
    if (i%1000 == 0) {
      System.out.println("addded elements " + i);
    }
  }
}
 void log(List<String> values) {
  System.out.print("simulating slow processing....");
  System.out.println("processing: " + Arrays.toString(values.toArray()));
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

Мое намерение состоит в том, чтобы вещатель выполнял журнал (..) асинхронно, когда был достигнут размер буфера, однако похоже, что он всегда выполняет журнал (...) в режиме блокировки. выполнение 100 после выполнения следующих 100 и так далее .. как я могу сделать это асинхронным?

спасибо вывалиты


person vyvalyty    schedule 20.03.2016    source источник


Ответы (1)


Возможный шаблон - использовать flatMap с publishOn:

Flux.range(1, 1_000_000)
.buffer(100)
.flatMap(b -> Flux.just(b).publishOn(SchedulerGroup.io())
   .doOnNext(this::log))
.consume(...);
person akarnokd    schedule 30.03.2016