Получить информацию о сообщении в Spring RecoveryCallback

Я публикую сообщения в RabbitMQ, и я хотел бы отслеживать ошибки, когда RabbitMQ не работает, для этого я добавил один RetryTemplate с обратным вызовом восстановления, но обратный вызов восстановления предоставляет только этот метод getLastThrowable(), и я не уверен, как обеспечить сведения о сообщениях, которые не удалось выполнить, когда RabbitMQ не работает. (согласно документации «RecoveryCallback несколько ограничен тем, что контекст повтора содержит только поле lastThrowable. Для более сложных случаев использования вы должны использовать внешний RetryTemplate, чтобы вы могли передавать дополнительную информацию в RecoveryCallback через атрибуты контекста»), но Я не знаю, как это сделать, если бы кто-нибудь мог помочь мне с одним примером, это было бы здорово.

Шаблон кролика

public RabbitTemplate rabbitMqTemplate(RecoveryCallback publisherRecoveryCallback) {
    RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
    r.setExchange(exchangeName);
    r.setRoutingKey(routingKey);
    r.setConnectionFactory(rabbitConnectionFactory);
    r.setMessageConverter(jsonMessageConverter());

    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    r.setRetryTemplate(retryTemplate);
    r.setRecoveryCallback(publisherRecoveryCallback);
    return r;
    }

Обратный вызов восстановления

@Component
public class PublisherRecoveryCallback implements RecoveryCallback<AssortmentEvent> {
    @Override
    public AssortmentEvent recover(RetryContext context) throws Exception {
        log.error("Error publising event",context.getLastThrowable());
        //how to get message details here??
        return null;
    }
}

Исходящий адаптер AMQP

return IntegrationFlows.from("eventsChannel") .split() .handle(Amqp.outboundAdapter(rabbitMqTemplate) .exchangeName(exchangeName) .confirmCorrelationExpression("payload") .confirmAckChannel(ackChannel) .confirmNackChannel(nackChannel) ) .get();


person Yerko Aguirre    schedule 26.07.2017    source источник


Ответы (1)


Это невозможно, потому что функция RabbitTemplate.execute() уже не знает об отправленном вами сообщении, потому что это может быть выполнено любым другим методом, где у нас может не быть сообщений для обработки:

return this.retryTemplate.execute(
                    (RetryCallback<T, Exception>) context -> RabbitTemplate.this.doExecute(action, connectionFactory),
                    (RecoveryCallback<T>) this.recoveryCallback);

То, что я предлагаю вам сделать, похоже на сохранение сообщения в ThreadLocal перед отправкой и получение его оттуда из вашего пользовательского RecoveryCallback.

person Artem Bilan    schedule 27.07.2017
comment
Проблема в том, что я использую Spring Integration DSL и у меня нет контроля над rabbitTemplate. - person Yerko Aguirre; 27.07.2017
comment
Итак, вы можете использовать .handle() для сохранения значения в ThreadLocal и возврата его для следующего .handle(Amqp.outboundChannelAdapter()) - person Artem Bilan; 27.07.2017
comment
отлично, я попытаюсь сохранить сообщение (локальный поток) в первом дескрипторе (), а затем передать это значение исходящему адаптеру, а затем удалить это сообщение из локального потока, как только я получу подтверждение, спасибо, Артем !!!! !!! - person Yerko Aguirre; 27.07.2017
comment
Это правильно. Рад быть полезным: stackoverflow.com/help/someone-answers - person Artem Bilan; 27.07.2017