Получить ссылку на исходный элемент после Akka Streams Sink?

Я пытаюсь использовать разъем Amqp alpakka в качестве источника и приемника.

Source<CommittableReadResult, NotUsed> AMQP_SOURCE  
      -> processAndGetResponse 
      -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 

Я хочу подтвердить сообщение, полученное из очереди Amqp, после успешной операции Sink. как это:

amqp_source(committableReadResult) 
    -> processAndGetResponse 
    -> amqp_sink 
    -> IfSinkOperationSuccess.Then(committableReadResult.ack())

Как я могу этого добиться? Я в основном хочу пометить сообщение как подтверждение только после успешной операции приемника.


person Jerald Baker    schedule 16.03.2020    source источник
comment
Если вы используете akka-stream v2.5.20+, советую посмотреть SourceWithContext и Блог Software Mill   -  person chauhraj    schedule 22.03.2020


Ответы (1)


В общем, если у вас есть Sink, который материализуется в Future[Done] (Scala API, я полагаю, что Java-эквивалент CompletionStage[Done]), вы можете просто запустить подпоток с этим Sink на этапе mapAsync. Данные будут передаваться только в случае успеха.

При беглом просмотре API Alpakka AMQP все Sink материализуются таким образом, так что этот подход будет работать.

Предполагая, что ваша функция doSomethingWithMessage приводит как к CommittableReadResult, так и к WriteMessage, в Scala будет работать что-то вроде следующего:

def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???

amqpSource
  .map(doSomethingWithMessage)
  .mapAsync(parallelism) { tup =>
    val (crr, wm) = tup
    val fut = Source.single(crr, wm).runWith(amqpSink)
    fut.flatMap(_ => crr.ack().map(_ => crr.message))
  }.runWith(Sink.ignore)
person Levi Ramsey    schedule 17.03.2020
comment
Спасибо за ответ. Это полезно. В этом случае не будет ли приемник повторно инициализирован, т. е. создавать новое соединение с сервером amqp для каждого элемента в потоке? - person Jerald Baker; 18.03.2020