Как создать процессор с транзакцией и DLQ с привязкой Rabbit?

Я только начинаю изучать Spring Cloud Streams и Dataflow и хочу узнать об одном из важных для меня вариантов использования. Я создал пример процессора Multiplier, который принимает сообщение и повторно отправляет его 5 раз на вывод.

@EnableBinding(Processor.class)
public class MultiplierProcessor {
    @Autowired
    private Source source;

    private int repeats = 5;

    @Transactional
    @StreamListener(Processor.INPUT)
    public void handle(String payload) {
        for (int i = 0; i < repeats; i++) {
            if(i == 4) {
                throw new RuntimeException("EXCEPTION");
            }
            source.output().send(new GenericMessage<>(payload));
        }
    }
}

Что вы можете видеть, так это то, что перед 5-й отправкой этот процессор дает сбой. Почему? Потому что может (программы выбрасывают исключения). В этом случае я хотел попрактиковаться в предотвращении сбоев в Spring Cloud Stream.

Чего я хотел бы добиться, так это иметь входное сообщение, поддерживаемое в DLQ, и 4 сообщения, которые были отправлены до того, как они будут возвращены и не будут использованы следующим операндом (точно так же, как в обычной транзакции JMS). Я уже пытался определить следующие свойства в своем проекте процессора, но безуспешно.

spring.cloud.stream.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.bindings.output.producer.republishToDlq=true
spring.cloud.stream.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.consumer.autoBindDlq=true

Не могли бы вы сказать мне, возможно ли это, а также что я делаю неправильно? Буду очень признателен за несколько примеров.


person Mati    schedule 18.08.2016    source источник


Ответы (1)


У вас есть несколько проблем с вашей конфигурацией:

  • отсутствует .rabbit в свойствах кролика)
  • вам нужно имя группы и постоянная подписка, чтобы использовать autoBindDlq
  • autoBindDlq не применяется на стороне вывода

Потребитель должен быть транзакцией, чтобы отправка производителя выполнялась в той же транзакции.

Я только что проверил это с 1.0.2.RELEASE:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true

и это сработало, как и ожидалось.

ИЗМЕНИТЬ

На самом деле нет, откат опубликованных сообщений не производился. Расследование...

ИЗМЕНИТЬ2

ХОРОШО; это работает, но вы не можете использовать republishToDlq, потому что, когда это разрешено, связующее устройство публикует сообщение о сбое в DLQ, и транзакция фиксируется.

Когда это ложно, в контейнер выбрасывается исключение, транзакция откатывается, и RabbitMQ перемещает ошибочное сообщение в DLQ.

Обратите внимание, однако, что повторная попытка включена по умолчанию (3 попытки), поэтому, если ваш процессор преуспеет во время повторной попытки, вы получите дубликаты в своем выводе.

Чтобы это работало так, как вы хотите, вам нужно отключить повтор, установив максимальное количество попыток на 1 (и не используйте republishToDlq).

ИЗМЕНИТЬ3

Хорошо, если вы хотите больше контролировать публикацию ошибок, это сработает, когда исправление для этого JIRA применяется к Spring AMQP...

@SpringBootApplication
@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {

    public static void main(String[] args) {
        SpringApplication.run(So39018400Application.class, args);
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    public interface Errors {

        @Output("errors")
        MessageChannel errorChannel();

    }

    private static class Foo {

        @Autowired
        Source source;

        @Autowired
        Errors errors;

        @StreamListener(Processor.INPUT)
        public void handle (Message<byte[]> in) {
            try {
                source.output().send(new GenericMessage<>("foo"));
                source.output().send(new GenericMessage<>("foo"));
                throw new RuntimeException("foo");
            }
            catch (RuntimeException e) {
                errors.errorChannel().send(MessageBuilder.fromMessage(in)
                        .setHeader("foo", "bar") // add whatever you want, stack trace etc.
                        .build());
                throw e;
            }
        }

    }

}

со свойствами:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.errors.destination=so8400errors
spring.cloud.stream.rabbit.bindings.errors.producer.transacted=false


spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=false
spring.cloud.stream.bindings.input.consumer.max-attempts=1
person Gary Russell    schedule 18.08.2016
comment
Привет, @gary-russell, спасибо за ответ. Один вопрос по этому поводу. Нужно ли мне устанавливать output.destination, когда я использую этот процессор с потоком данных? - person Mati; 18.08.2016
comment
Смотрите мое второе редактирование для получения дополнительной информации. Если вы не укажете адресата, сообщение будет опубликовано на бирже под названием output с ключом маршрутизации output. - person Gary Russell; 18.08.2016
comment
Извините, вы сказали с потоком данных - нет, поток данных заботится об именах вещей. - person Gary Russell; 18.08.2016
comment
Большое спасибо, это действительно сработало :) Но не могли бы вы сказать мне, почему я не могу использовать republishToDlq ? Наличие дополнительной информации о том, что произошло, было бы очень полезно для расследования. - person Mati; 18.08.2016
comment
Повторная публикация связующим считается восстановлением сбоя, поэтому транзакция фиксируется (включая ваши выходные данные в этом случае). Я удалил свой предыдущий комментарий (о повторной публикации в вашем приложении), потому что это не сработает — повторная публикация по-прежнему выполняется в той же транзакции. Мне нужно немного поработать в Spring AMQP, чтобы этого избежать. Я открыл выпуск JIRA. - person Gary Russell; 18.08.2016