Флаг setChannelTransacted RabbitTemplate приводит к тому, что сообщение не доставляется в очередь

Учитывая, что у меня есть приложение с анонимной очередью AMQP и разветвленным обменом:

@Bean
public Queue cacheUpdateAnonymousQueue() {
    return new AnonymousQueue();
}

public static final String CACHE_UPDATE_FANOUT_EXCHANGE = "cache.update.fanout";

@Bean
FanoutExchange cacheUpdateExchange() {
    return new FanoutExchange(CACHE_UPDATE_FANOUT_EXCHANGE);
}

@Bean
Binding cacheUpdateQueueToCacheUpdateExchange() {
    return bind(cacheUpdateAnonymousQueue())
            .to(cacheUpdateExchange());
}

и поток интеграции Spring:

@Bean
public IntegrationFlow cacheOutputFlow() {
    return from(channelConfig.cacheUpdateOutputChannel())
            .transform(objectToJsonTransformer())
            .handle(outboundAdapter())
            .get();
}

И я использую исходящий адаптер:

public MessageHandler outboundAdapter() {
    rabbitTemplate.setChannelTransacted(true);
    return outboundAdapter(rabbitTemplate)
            .exchangeName(CACHE_UPDATE_FANOUT_EXCHANGE)
            .get();
}

В логах вижу:

o.s.amqp.rabbit.core.RabbitTemplate: Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@40976c4b Shared Rabbit Connection: SimpleConnection@1cfaa28d [delegate=amqp://[email protected]:5672/, localPort= 56042]
o.s.amqp.rabbit.core.RabbitTemplate: Publishing message on exchange [cache.update.fanout], routingKey = []

но сообщение не доставляется в очередь, связанную с обменом cache.update.fanout.

Когда я устанавливаю rabbitTemplate.setChannelTransacted(false); в исходящем адаптере, я вижу в журналах:

o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@11a1389d Shared Rabbit Connection: SimpleConnection@444c6abf [delegate=amqp://[email protected]:5672/, localPort= 56552]
o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message on exchange [cache.update.fanout], routingKey = []

и сообщение доставляется в очередь.

Почему сообщение не доставляется в первом случае?

Почему RabbitTemplate ничего не указывает?


person Patrik Mihalčin    schedule 02.04.2020    source источник


Ответы (1)


Ваши журналы имеют разные имена обмена; Я только что проверил это так ...

@SpringBootApplication
public class So60993877Application {

    public static void main(String[] args) {
        SpringApplication.run(So60993877Application.class, args);
    }

    @Bean
    public Queue cacheUpdateAnonymousQueue() {
        return new AnonymousQueue();
    }

    public static final String CACHE_UPDATE_FANOUT_EXCHANGE = "cache.update.fanout";

    @Bean
    FanoutExchange cacheUpdateExchange() {
        return new FanoutExchange(CACHE_UPDATE_FANOUT_EXCHANGE);
    }

    @Bean
    Binding cacheUpdateQueueToCacheUpdateExchange() {
        return BindingBuilder.bind(cacheUpdateAnonymousQueue())
                .to(cacheUpdateExchange());
    }

    @RabbitListener(queues = "#{cacheUpdateAnonymousQueue.name}")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            template.convertAndSend(CACHE_UPDATE_FANOUT_EXCHANGE, 
                cacheUpdateAnonymousQueue().getName(), "foo");
            template.setChannelTransacted(true);
            template.convertAndSend(CACHE_UPDATE_FANOUT_EXCHANGE, 
                cacheUpdateAnonymousQueue().getName(), "bar");
        };
    }

}

Без проблем.

foo
bar

С включенными подтверждениями и возвратами:

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
            LOG.info("Return: " + message));
        template.setConfirmCallback((correlationData, ack, cause) ->
            LOG.info("Confirm: " + correlationData + ": " + ack));
        return args -> {
            template.convertAndSend(CACHE_UPDATE_FANOUT_EXCHANGE, cacheUpdateAnonymousQueue().getName(),
                    "foo", new CorrelationData("foo"));
//          template.setChannelTransacted(true);
            template.convertAndSend(CACHE_UPDATE_FANOUT_EXCHANGE, cacheUpdateAnonymousQueue().getName(),
                    "bar", new CorrelationData("bar"));
            template.convertAndSend("missingExchange", cacheUpdateAnonymousQueue().getName(), "baz",
                    new CorrelationData("baz"));
            Thread.sleep(5000);
        };
    }
person Gary Russell    schedule 02.04.2020
comment
Я сделал опечатку во второй записи журнала, извините. Может ли такое поведение быть связано с версией брокера rabbitmq? - person Patrik Mihalčin; 02.04.2020
comment
Я очень сомневаюсь. Попробуйте включить подтверждения и возвраты издателя в фабрике соединений (и добавьте слушателей в RabbitTemplate для их отображения), но вы можете использовать подтверждения только в том случае, если транзакция false. - person Gary Russell; 02.04.2020
comment
+ Я также использую старые зависимости Rabbit: [INFO] | \- org.springframework.amqp:spring-rabbit:jar:1.7.6.RELEASE:compile [INFO] | +- org.springframework.amqp:spring-amqp:jar:1.7.6.RELEASE:compile [INFO] | +- com.rabbitmq:http-client:jar:1.1.1.RELEASE:compile [INFO] | \- com.rabbitmq: amqp-client: jar: 4.0.3: скомпилировать - person Patrik Mihalčin; 02.04.2020
comment
попробую то, что вы предложили - person Patrik Mihalčin; 02.04.2020
comment
Ой!! Это 2 года; вы должны хотя бы обновиться до 1.7.14. Однако я очень сомневаюсь, что проблема в этом (но я проверю). - person Gary Russell; 02.04.2020