Учитывая, что у меня есть приложение с анонимной очередью AMQP и разветвленным обменом:
@Bean
public Queue cacheUpdateAnonymousQueue() {
return new AnonymousQueue();
}
public static final String CACHE_UPDATE_FANOUT_EXCHANGE = "cache.update.fanout";
@Bean
FanoutExchange cacheUpdateExchange() {
return new FanoutExchange(CACHE_UPDATE_FANOUT_EXCHANGE);
}
@Bean
Binding cacheUpdateQueueToCacheUpdateExchange() {
return bind(cacheUpdateAnonymousQueue())
.to(cacheUpdateExchange());
}
и поток интеграции Spring:
@Bean
public IntegrationFlow cacheOutputFlow() {
return from(channelConfig.cacheUpdateOutputChannel())
.transform(objectToJsonTransformer())
.handle(outboundAdapter())
.get();
}
И я использую исходящий адаптер:
public MessageHandler outboundAdapter() {
rabbitTemplate.setChannelTransacted(true);
return outboundAdapter(rabbitTemplate)
.exchangeName(CACHE_UPDATE_FANOUT_EXCHANGE)
.get();
}
В логах вижу:
o.s.amqp.rabbit.core.RabbitTemplate: Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@40976c4b Shared Rabbit Connection: SimpleConnection@1cfaa28d [delegate=amqp://[email protected]:5672/, localPort= 56042]
o.s.amqp.rabbit.core.RabbitTemplate: Publishing message on exchange [cache.update.fanout], routingKey = []
но сообщение не доставляется в очередь, связанную с обменом cache.update.fanout
.
Когда я устанавливаю rabbitTemplate.setChannelTransacted(false);
в исходящем адаптере, я вижу в журналах:
o.s.amqp.rabbit.core.RabbitTemplate : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@11a1389d Shared Rabbit Connection: SimpleConnection@444c6abf [delegate=amqp://[email protected]:5672/, localPort= 56552]
o.s.amqp.rabbit.core.RabbitTemplate : Publishing message on exchange [cache.update.fanout], routingKey = []
и сообщение доставляется в очередь.
Почему сообщение не доставляется в первом случае?
Почему RabbitTemplate ничего не указывает?