Я пытаюсь переработать систему Hazelcast Jet 0.3 DAG, которую я написал несколько недель назад, до версии 0.4 в качестве первого шага по ее замене с партии на поток. Что интересно, внезапно я испытываю странное поведение, при котором я не могу быть уверен, что вершины работают должным образом. Пытаясь разобраться в том, что происходит, я не могу найти вариант, как заглянуть во внутреннюю работу каждой вершины. Есть ли способ вывести из них хоть какие-то сообщения об ошибках?
Пытаясь изолировать проблему, я попытался упростить ее до очень упрощенного "чтения из списка, сопоставить его с картой, записать на карту" DAG. Но все еще безуспешно.
Под моим упрощенным примером, может быть, я совершу очень простую ошибку, которую сразу же заметит кто-то более знающий?
Издатель:
// every second via executorservice:
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data");
data.set(jet.getHazelcastInstance().getAtomicLong("key").getAndIncrement(), myByteArray);
Анализатор:
jet.getList(key.toString()).addAll((List<byte[]>) jet.getMap("data").get(key));
jet.getMap("data").remove(key);
logger.debug("List {} has size: {}", key, jet.getList(key.toString()).size());
final Vertex sourceDataMap = this.newVertex("sourceDataMap", readList(key.toString())).localParallelism(1);
final Vertex parseByteArrayToMap = this.newVertex("parseByteArrayToMap", map(
(byte[] e) -> new AbstractMap.SimpleEntry<>(jet.getHazelcastInstance().getAtomicLong("counter").getAndIncrement(), e)));
final Vertex sinkIntoResultMap = this.newVertex("sinkIntoResultMap", writeMap("result"));
this.edge(between(sourceDataMap, parseByteArrayToMap))
.edge(between(parseByteArrayToMap, sinkIntoResultMap));
Слушатель:
jet.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>)
(EntryEvent<Long, byte[]> entryEvent)
-> logger.debug("Got result: {} at {}",entryEvent.getValue().length, System.currentTimeMillis())
, true);
Генерация данных работает нормально, все в порядке, пока DAG не возьмет на себя ответственность ... но никаких сообщений об ошибках или чего-либо, поступающего от DAG. Какие-либо предложения?
DiagnosticProcessors
, в котором есть описанные вами инструменты. - person Marko Topolnik   schedule 30.06.2017jet
. JetInstance не сериализуем, поэтому не может использоваться как часть лямбда-выражения. Если вы хотите сгенерировать уникальные идентификаторы, вы можете использовать что-то вродеUUID.randomUUID()
. - person Can Gencer   schedule 30.06.2017IllegalArgumentException
в строке, которая создает вершину, упомянутую Can. Так что он терпит неудачу быстро (и громко), даже не получив возможности приступить к работе. - person Marko Topolnik   schedule 30.06.2017randomUUID()
я получаю вывод журнала прослушивателя записей. - person Marko Topolnik   schedule 30.06.2017