У меня есть простой коммитируемый источник для потока Kafka, завернутый в RestartSource. Он отлично работает в счастливом пути, но если я намеренно разорву соединение с кластером Kafka, он выдаст исключение соединения из базового клиента kafka и сообщит о завершении работы Kafka Consumer. Я ожидал, что поток перезапустится через ~ 150 секунд, но это не так. Является ли мое понимание/использование RestartSource неправильным снизу:
val atomicControl = new AtomicReference[Consumer.Control](NoopControl)
val restartablekafkaSourceWithFlow = {
RestartSource.withBackoff(30.seconds, 120.seconds, 0.2) {
() => {
Consumer.committableSource(consumerSettings.withClientId("clientId"), Subscriptions.topics(Set("someTopic")))
.mapMaterializedValue(c => atomicControl.set(c))
.via(someFlow)
.via(httpFlow)
}
}
}
val committerSink: Sink[(Any, ConsumerMessage.CommittableOffset), Future[Done]] = Committer.sinkWithOffsetContext(CommitterSettings(actorSystem))
val runnableGraph = restartablekafkaSourceWithFlow.toMat(committerSink)(Keep.both)
val control = runnableGraph.mapMaterializedValue(x => Consumer.DrainingControl.apply(atomicControl.get, x._2)).run()
Committer.flowWithOffsetContext
, либо обернуть приемник фиксации вRestartSink
. - person Serhii Shynkarenko   schedule 08.01.2020