Hazelcast Jet 0.6.1 - ошибка компиляции с API конвейера customTransform

Я получаю следующую ошибку компиляции с API конвейера customTransform.

Вот пример кода для построения конвейера:

private Pipeline buildPipeline2() {
    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getClientConfig(), START_FROM_OLDEST))
          .addTimestamps((v) ->  getTimeStamp(v), 3000)
          .peek()
          .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
    .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
    //.aggregate(counting(),(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), formatKey(key), result))
    .aggregate(counting())
    .map((v)-> getMapKey(v))
    .customTransform("test2", ()-> this)
    .drainTo(Sinks.map("Test"));
    //.drainTo(Sinks.files("c:\\data\\op.txt"));
    return p;
  }

Вот пример кода для метода tryProcess():

protected boolean tryProcess(int ordinal, Object item) {
    JetEvent jetEvent = (JetEvent)item;
    Object obj = jetEvent.payload();
    tryEmit(ordinal,item);
    return true;
}

Вот ошибка компиляции.

incompatible types: inferred type does not conform to upper bound(s)
[ERROR] inferred: java.lang.Object
[ERROR] upper bound(s): java.util.Map.Entry

Это компилируется и хорошо работает со следующим кодом.

 .customTransform("test2", ()-> this)
 .drainTo(Sinks.files("c:\\data\\op.txt"));

Однако выдает ошибку компиляции со следующим кодом.

.customTransform("test2", ()-> this)
.drainTo(Sinks.map("Test"));

Не могли бы вы помочь мне решить эту проблему?


person Harshad Murtekar    schedule 20.06.2018    source источник
comment
Скорее всего, это не работает с map Sink, потому что он ожидает полный экземпляр Map.Entry с ключом и значением. И вы извлекаете из него только ключ на шаге .map((v)-> getMapKey(v)), так что только JetEvent объект проходит дальше в приемник. И тогда это показывает ошибку.   -  person Nazar    schedule 21.06.2018


Ответы (1)


customTransform не является типобезопасным. Если параметр типа не может быть выведен, он будет оцениваться как Object. Однако для Sinks.map требуется Map.Entry<K, V>. Чтобы решить эту проблему, добавьте подсказку типа к методу customTransform:

    .<Map.Entry<YourKeyType, YourValueType>customTransform("test2", ()-> this)
    .drainTo(Sinks.map("Test"));

Имейте в виду, что если ваш пользовательский процессор на самом деле не возвращает Map.Entry, он выйдет из строя во время выполнения.

Sinks.files работает, потому что требует Object.

person Oliv    schedule 22.06.2018