Alpakka Kafka Stream не перезапускается после ошибок подключения, несмотря на использование RestartSource

У меня есть простой коммитируемый источник для потока Kafka, завернутый в RestartSource. Он отлично работает в счастливом пути, но если я намеренно разорву соединение с кластером Kafka, он выдаст исключение соединения из базового клиента kafka и сообщит о завершении работы Kafka Consumer. Я ожидал, что поток перезапустится через ~ 150 секунд, но это не так. Является ли мое понимание/использование RestartSource неправильным снизу:

val atomicControl = new AtomicReference[Consumer.Control](NoopControl)
val restartablekafkaSourceWithFlow = {
        RestartSource.withBackoff(30.seconds, 120.seconds, 0.2) {
          () => {
            Consumer.committableSource(consumerSettings.withClientId("clientId"), Subscriptions.topics(Set("someTopic")))
              .mapMaterializedValue(c => atomicControl.set(c))
              .via(someFlow)
              .via(httpFlow)
          }
        }
      }
val committerSink: Sink[(Any, ConsumerMessage.CommittableOffset), Future[Done]] = Committer.sinkWithOffsetContext(CommitterSettings(actorSystem))

val runnableGraph = restartablekafkaSourceWithFlow.toMat(committerSink)(Keep.both)

val control = runnableGraph.mapMaterializedValue(x => Consumer.DrainingControl.apply(atomicControl.get, x._2)).run()

person K P    schedule 03.01.2020    source источник
comment
Таким образом, кажется, что я могу добавить контроль над графиком с помощью runnableGraph.withAttributes(ActorAttributes.supervisionStrategy(decider)) и перезапустить весь график, если есть исключение соединения из базового клиента kafka. Однако я не уверен, почему вышеуказанный источник не терпит неудачу?   -  person K P    schedule 04.01.2020
comment
вы должны предоставить более подробную информацию об исключении. если это проблема с сетью, фиксация также может завершиться неудачно (в конце концов, коммиттер также должен общаться с kafka), но ваш код охватывает только исходную часть. если это действительно связано с приемником, вы можете попробовать использовать либо Committer.flowWithOffsetContext, либо обернуть приемник фиксации в RestartSink.   -  person Serhii Shynkarenko    schedule 08.01.2020


Ответы (1)


Возможно, вы получаете ошибку за пределами RestartSource.

Вы можете добавить recover, чтобы увидеть ошибку, и/или создать решающий фактор, как показано ниже, и использовать его в runnableGraph.

private val decider: Supervision.Decider = { e =>
    logger.error("Unhandled exception in stream.", e)
    Supervision.Resume
  }

runnableGraph.withAttributes(supervisionStrategy(decider))
person hadican    schedule 10.04.2021