Как узнать, как работает Hazelcast Jet Vertex?

Я пытаюсь переработать систему Hazelcast Jet 0.3 DAG, которую я написал несколько недель назад, до версии 0.4 в качестве первого шага по ее замене с партии на поток. Что интересно, внезапно я испытываю странное поведение, при котором я не могу быть уверен, что вершины работают должным образом. Пытаясь разобраться в том, что происходит, я не могу найти вариант, как заглянуть во внутреннюю работу каждой вершины. Есть ли способ вывести из них хоть какие-то сообщения об ошибках?

Пытаясь изолировать проблему, я попытался упростить ее до очень упрощенного "чтения из списка, сопоставить его с картой, записать на карту" DAG. Но все еще безуспешно.

Под моим упрощенным примером, может быть, я совершу очень простую ошибку, которую сразу же заметит кто-то более знающий?

Издатель:

// every second via executorservice:
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data");
data.set(jet.getHazelcastInstance().getAtomicLong("key").getAndIncrement(), myByteArray);

Анализатор:

jet.getList(key.toString()).addAll((List<byte[]>) jet.getMap("data").get(key));
jet.getMap("data").remove(key);
logger.debug("List {} has size: {}", key, jet.getList(key.toString()).size());

final Vertex sourceDataMap = this.newVertex("sourceDataMap", readList(key.toString())).localParallelism(1);
final Vertex parseByteArrayToMap = this.newVertex("parseByteArrayToMap", map(
    (byte[] e) ->  new AbstractMap.SimpleEntry<>(jet.getHazelcastInstance().getAtomicLong("counter").getAndIncrement(), e)));
final Vertex sinkIntoResultMap = this.newVertex("sinkIntoResultMap", writeMap("result"));

this.edge(between(sourceDataMap, parseByteArrayToMap))
    .edge(between(parseByteArrayToMap,  sinkIntoResultMap));

Слушатель:

jet.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>)
       (EntryEvent<Long, byte[]> entryEvent)
           -> logger.debug("Got result: {} at {}",entryEvent.getValue().length, System.currentTimeMillis())
       , true);

Генерация данных работает нормально, все в порядке, пока DAG не возьмет на себя ответственность ... но никаких сообщений об ошибках или чего-либо, поступающего от DAG. Какие-либо предложения?


person Anders Bernard    schedule 30.06.2017    source источник
comment
Можете ли вы показать нам MCVE, который продемонстрирует вашу проблему? И вы также можете проверить класс DiagnosticProcessors, в котором есть описанные вами инструменты.   -  person Marko Topolnik    schedule 30.06.2017
comment
У вас есть лямбда, которая относится к jet. JetInstance не сериализуем, поэтому не может использоваться как часть лямбда-выражения. Если вы хотите сгенерировать уникальные идентификаторы, вы можете использовать что-то вроде UUID.randomUUID().   -  person Can Gencer    schedule 30.06.2017
comment
Я попытался выполнить задание из DAG, который вы разместили, и он выдает IllegalArgumentException в строке, которая создает вершину, упомянутую Can. Так что он терпит неудачу быстро (и громко), даже не получив возможности приступить к работе.   -  person Marko Topolnik    schedule 30.06.2017
comment
... и после перехода на randomUUID() я получаю вывод журнала прослушивателя записей.   -  person Marko Topolnik    schedule 30.06.2017
comment
Хорошо, это странно, я не получил исключение IllegalArgumentException. Это именно то, что я ищу. Спасибо.   -  person Anders Bernard    schedule 01.07.2017
comment
И нашел причину, по которой я не получил ошибку. Двойное палачское обертывание. Read runnable в runnable - ›внешний runnable ест внутренние ошибки.   -  person Anders Bernard    schedule 01.07.2017


Ответы (1)


Вот ваш код, слегка обработанный и работающий на моей стороне:

public class Main {
    public static void main(String[] args) throws Exception {
        JetInstance jet = Jet.newJetInstance();
        try {
            HazelcastInstance hz = jet.getHazelcastInstance();
            ILogger logger = hz.getLoggingService().getLogger("a");

            // every second via executorservice:
            final IStreamMap<Long, List<byte[]>> data = jet.getMap("data");
            List<byte[]> myByteArray = asList(new byte[1], new byte[2]);
            IAtomicLong keyGen = hz.getAtomicLong("key");
            Long key = keyGen.getAndIncrement();
            data.set(key, myByteArray);

            String stringKey = key.toString();
            hz.getList(stringKey).addAll((List<byte[]>) jet.getMap("data").get(key));
            jet.getMap("data").remove(key);
            logger.severe(String.format("List %s has size: %d", key, jet.getList(stringKey).size()));

            hz.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>)
                    (EntryEvent<Long, byte[]> entryEvent) -> logger.severe(String.format(
                            "Got result: %d at %d", entryEvent.getValue().length, System.currentTimeMillis())),
                    true);

            DAG dag = new DAG();
            Vertex sourceDataMap = dag.newVertex("sourceDataMap", readList(stringKey)).localParallelism(1);
            Vertex parseByteArrayToMap = dag.newVertex("parseByteArrayToMap", map(
                    (byte[] e) -> entry(randomUUID(), e)));
            Vertex sinkIntoResultMap = dag.newVertex("sinkIntoResultMap", writeMap("result"));

            dag.edge(between(sourceDataMap, parseByteArrayToMap))
               .edge(between(parseByteArrayToMap, sinkIntoResultMap));

            jet.newJob(dag).execute().get();
            Thread.sleep(1000);
        } finally {
            Jet.shutdownAll();
        }
    }
}

В консоли я вижу:

SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] List 0 has size: 2
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 2 at 1498822322228
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 1 at 1498822322228
person Marko Topolnik    schedule 30.06.2017