Полный пример брокера сообщений в Lagom

Я пытаюсь реализовать брокер сообщений, настроенный с помощью Lagom 1.2.2, и столкнулся со стеной. В документации есть следующий пример для дескриптора службы:

default Descriptor descriptor() {
return named("helloservice").withCalls(...)
  // here we declare the topic(s) this service will publish to
  .publishing(
    topic("greetings", this::greetingsTopic)
  )
  ....;
}

И этот пример для реализации:

public Topic<GreetingMessage> greetingsTopic() {
return TopicProducer.singleStreamWithOffset(offset -> {
    return persistentEntityRegistry
        .eventStream(HelloEventTag.INSTANCE, offset)
        .map(this::convertEvent);
  });
}

Однако нет примера того, что такое тип аргумента или тип возвращаемого значения функции convertEvent(), и здесь я ничего не понимаю. С другой стороны, подписчик MessageBroker, похоже, потребляет GreetingMessage объектов, но когда я создаю функцию convertEvent для возврата GreetingMessage объектов, я получаю ошибку компиляции:

Error:(61, 21) java: method map in class akka.stream.javadsl.Source<Out,Mat> cannot be applied to given types;
  required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T>
  found: this::convertEvent
  reason: cannot infer type-variable(s) T
    (argument mismatch; invalid method reference
  incompatible types: akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset> cannot be converted to com.example.GreetingMessage)

Есть ли более подробные примеры того, как это использовать? Я уже проверил образец приложения Chirper, и, похоже, у него нет такого примера.

Спасибо!


person Josh Wickham    schedule 21.02.2017    source источник


Ответы (1)


Вставленное вами сообщение об ошибке говорит вам именно то, что ожидает map:

required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T>

Итак, вам нужно передать функцию, которая принимает Pair<GreetingEvent, Offset>. Что должна вернуть функция? Что ж, обновите его, чтобы принять это, и тогда вы получите следующую ошибку, которая еще раз сообщит вам, что она ожидала от вас, и в этом случае вы обнаружите, что это Pair<GreetingMessage, Offset>.

Чтобы объяснить, что это за типы — Lagom нужно отслеживать, какие события были опубликованы в Kafka, чтобы при перезапуске службы она не запускалась с начала вашего журнала событий и заново не публиковала все события с начала времени. . Это делается с помощью смещений. Итак, журнал событий создает пары событий и смещений, а затем вам нужно преобразовать эти события в сообщения, которые будут опубликованы в Kafka, а когда вы вернули преобразованное сообщение в Lagom, оно должно быть в паре со смещением который вы получили из журнала событий, чтобы после публикации в Kafka Lagom мог сохранить смещение и использовать его в качестве отправной точки при следующем перезапуске службы.

Полный пример можно увидеть здесь: https://github.com/lagom/online-auction-java/blob/a32e696/bidding-impl/src/main/java/com/example/auction/bidding./impl/BiddingServiceImpl.java#L91

person James Roper    schedule 21.02.2017
comment
Спасибо; Я внимательно изучил предоставленный вами пример приложения для аукциона, и он оказался полезным (даже несмотря на то, что он использует taggedStreamWithOffset вместо singleStreamWithOffset (что, честно говоря, в любом случае может быть тем, чего я хочу). Часть, которую я упустил, это аргумент для convertEvent; по причинам, я предполагал, что я испортил тип возвращаемого значения, даже когда у меня был правильный тип возвращаемого значения, и это ввело меня в заблуждение. Еще раз, спасибо за помощь! - person Josh Wickham; 21.02.2017