Я только начинаю изучать Spring Cloud Streams и Dataflow и хочу узнать об одном из важных для меня вариантов использования. Я создал пример процессора Multiplier, который принимает сообщение и повторно отправляет его 5 раз на вывод.
@EnableBinding(Processor.class)
public class MultiplierProcessor {
@Autowired
private Source source;
private int repeats = 5;
@Transactional
@StreamListener(Processor.INPUT)
public void handle(String payload) {
for (int i = 0; i < repeats; i++) {
if(i == 4) {
throw new RuntimeException("EXCEPTION");
}
source.output().send(new GenericMessage<>(payload));
}
}
}
Что вы можете видеть, так это то, что перед 5-й отправкой этот процессор дает сбой. Почему? Потому что может (программы выбрасывают исключения). В этом случае я хотел попрактиковаться в предотвращении сбоев в Spring Cloud Stream.
Чего я хотел бы добиться, так это иметь входное сообщение, поддерживаемое в DLQ, и 4 сообщения, которые были отправлены до того, как они будут возвращены и не будут использованы следующим операндом (точно так же, как в обычной транзакции JMS). Я уже пытался определить следующие свойства в своем проекте процессора, но безуспешно.
spring.cloud.stream.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.bindings.output.producer.republishToDlq=true
spring.cloud.stream.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.consumer.autoBindDlq=true
Не могли бы вы сказать мне, возможно ли это, а также что я делаю неправильно? Буду очень признателен за несколько примеров.