У меня проблемы с версией реактора 2.0. А именно, я пытаюсь настроить реактивный поток сигналов, который разветвляет сигнал в пул ожидающих потоков. Я очень хорошо знаком с Rx и с Reactive Cocoa, но здесь мне не хватает чего-то базового.
У меня есть базовое преобразование, которое выглядит следующим образом:
WorkQueueDispatcher dispatcher = new WorkQueueDispatcher("dispatch", 10, 64, {... Exception handle code here ...}
return objectStream
.partition(partitions)
.dispatchOn(dispatcher)
.merge()
.map(new Function<Object, Object>() {
@Override
public Object apply(Object o) {
try {
return extension.eval(o, null);
} catch (UnableToEvaluateException e) {
e.printStackTrace();
return null;
}
}
});
Я пробовал этот поток примерно семью или восемью различными способами, включая разные диспетчеры и т. д. Я пытался разбить его на сгруппированный поток событий и обрабатывать каждый элемент отдельно, а затем записывать в отдельный поток для обработки. В любой ситуации я либо вижу, что все запросы обрабатываются в одном и том же потоке (который работает, а не многопоточен), либо получаю сообщение об ошибке, которое меня пугает:
java.lang.IllegalStateException: Dispatcher provided doesn't support event ordering. For concurrent signal dispatching, refer to #partition()/groupBy() method and assign individual single dispatchers.
at reactor.core.support.Assert.state(Assert.java:387)
at reactor.rx.Stream.dispatchOn(Stream.java:720)
at reactor.rx.Stream.dispatchOn(Stream.java:650)
Я пробовал следующее:
- вручную делая раздел/группу.
- явно устанавливая отдельный однопоточный диспетчер (кольцо) для более ранних шагов.
- Просто говорю «фф», не работает, а просто сбрасывает в свою очередь для обработки.
Что мне здесь не хватает? Должен ли я не использовать вещательную компанию для запуска цикла сообщений? Я действительно не забочусь об исполнении заказа здесь.
(отредактировано)
Вот что я делаю со своим доморощенным кодом для масштабирования:
objectStream
.consume(new Consumer<Object>() {
@Override
public void accept(Object o) {
final Object target = o;
tpe.execute(new Runnable(){
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p/>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
//System.out.println("On thread "+ Thread.currentThread().getName());
Timer.Context onNext = onNextTimer.time();
Timer.Context timer = callComponentTimer.time();
Object translated = extension.eval(target, null);
timer.close();
broadcaster.onNext(translated);
onNext.close();
} catch (UnableToEvaluateException e) {
e.printStackTrace();
}
}
});
изменить
Хорошо, я обновил его следующим образом:
MetricRegistry reg = DMPContext.getContext().getMetricRegistry();
de.init(null);
ConsoleReporter reporter = ConsoleReporter.forRegistry(DMPContext.getContext().getMetricRegistry())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
reporter.start(10, TimeUnit.SECONDS);
final CountDownLatch latch = new CountDownLatch(COUNT);
final Function<String, Object> translator = JSON.from(Request.class);
String content = new String(Files.readAllBytes(Paths.get("/svn/DMPidea/Request.json")));
Broadcaster<String> stringBroadcaster = Broadcaster.create();
final Exec exec = new Exec();
stringBroadcaster
.partition(10)
.consume(new Consumer<GroupedStream<Integer, String>>() {
@Override
public void accept(GroupedStream<Integer, String> groupedStream) {
groupedStream.dispatchOn(Environment.cachedDispatcher()).map(translator).map(new Function<Object, Object>() {
@Override
public Object apply(Object o) {
try {
System.out.println("Got thread " +Thread.currentThread().getName());
return de.eval(o, null);
} catch (UnableToEvaluateException e) {
e.printStackTrace();
return null;
}
}
}).consume(new Consumer<Object>() {
@Override
public void accept(Object o) {
latch.countDown();
}
});
}
});
for (int i=0; i<COUNT; i++)
{
stringBroadcaster.onNext(content);
}
latch.await();
Я все еще вижу однопоточное выполнение:
Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1