Чтобы управлять длительной задачей с помощью Spring Cloud Stream 3.1.1 со связывателем Kafka, нам нужно использовать Pollable Consumer для управления потреблением вручную в отдельном потоке, чтобы Kafka не запускал ребалансировку. Для этого мы определили новую аннотацию для управления Pollable Consumer. Проблема с этим подходом заключается в том, что работой необходимо управлять в отдельном потоке, любое возникшее исключение в конечном итоге не попадет в errorChannel
и DLQ.
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableConsumer) && args(dataCapsule,..)")
public void handleMessage(ProceedingJoinPoint joinPoint,
PollableConsumer pollableConsumer, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableConsumerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
Мы можем создать другой выходной канал для публикации сообщения в случае исключения, но нам кажется, что мы пытаемся реализовать что-то, что может быть ненужным.
Обновление 1
Мы добавили эти бобы:
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"http://localhost:9092");
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicErr() {
return TopicBuilder.name("ERR").partitions(1).replicas(1).build();
}
@Bean
public SeekToCurrentErrorHandler eh(KafkaOperations<String, byte[]> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
template,
(cr, e) -> new TopicPartition("ERR", 1)),
new FixedBackOff(0L, 1L));
}
И enable-dlq
не установлен в spring.cloud.stream.kafka.bindings.channel-name.consumer
, но по-прежнему мы не видим никаких сообщений, отправляемых по теме ERR. Даже для любых исключений, созданных основным потоком.
Если для enable-dlq
установлено значение true, исключения в основном потоке будут публиковаться в теме dlq по умолчанию, и, как и ожидалось, исключения в дочернем потоке игнорируются.
Обновление 2
Пример Гэри в целом работает. Хотя нам нужно было внести некоторые изменения, поскольку мы используем устаревший подход StreamListner вместо функций, есть несколько проблем, которые мы не смогли решить в нашем случае.
- Предполагается, что название темы всегда будет
channel_name+.DLT
, так как мы не могли понять, как можно использовать другое имя, напримерdlq
. Мы используем однуdlq
тему для всех потребителей, что, похоже, не соответствует ожиданиям DLT Spring-kafka по умолчанию. - Похоже, нам нужно иметь как минимум такое же количество разделов в DLT, что и в теме-потребителе. В противном случае это решение не сработает. Не уверен, как с этим можно справиться, поскольку это не кажется нам практическим предположением.
- Есть ли способ использовать повторные попытки Spring, аналогичные тому, что Spring Cloud Stream делает за кулисами? Или это нужно реализовывать отдельно? т.е. повторная попытка работы на основе
max.attempts
, а затем включение части DLQ. - Я мог видеть, что в примере пружинный привод использовался для обновления статуса канала через
this.endpoint.changeState("polled", State.PAUSED)
иthis.endpoint.changeState("polled", State.RESUMED)
. Почему нам нужно делать это вместе с паузами, повторной постановкой в очередь и т. Д. Каков побочный эффект невыполнения этого действия?