Как я могу разделить совокупный обмен GroupedExchangeAggregationStrategy на исходные биржи?

После агрегирования обменов с использованием GroupedExchangeAggregationStrategy мне нужно разделить их на части (для выдачи индивидуальных показателей времени обработки) на исходные обмены.

Я пробовал разделить следующим образом, но полученный разделенный обмен обертывает исходный обмен и помещает его в тело Message.

Можно ли разделить GroupedExchangeAggregationStrategy агрегированный обмен на исходные обмены без обмена оболочкой? Мне нужно использовать исходные свойства обмена, и я хотел бы сделать это с помощью выражения SpEL.

.aggregate(constant(true), myGroupedExchangeAggregationStrategy)
    .completionInterval(1000)
    .completeAllOnStop()
    .process { /* do stuff */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
    .to(/* micrometer timer metric using SpEL expression */)
    // ^- the resulting split exchange is wrapped in another exchange

Если это в настоящее время не поддерживается, я пытаюсь найти лучший способ реализовать это поведение самостоятельно, не создавая собственный Splitter процессор для этой единственной функции. Я надеялся каким-то образом переопределить SplitterIterable, выполняющий обертывание, но это оказалось невозможным.


person geg    schedule 09.04.2019    source источник


Ответы (2)


Да, _ 1_ ничего не делает, кроме создания java.util.List из всех Биржи. С другой стороны, Splitter EIP разбивает список по умолчанию на элементы и помещает элемент в тело сообщения. Таким образом, вы получаете биржу, которая содержит биржу в своем теле.

Что вам нужно, так это AggregationStrategy, которая собирает все объекты тела в список, а не все биржи.

Вы можете попробовать использовать Camels _ 3_, настраиваемый через свободный API.

new FlexibleAggregationStrategy () .storeInBody () .accumulateInCollection (ArrayList.class) .pick (new SimpleExpression ("$ {body}"));

Это должно создать AggregationStrategy, который извлекает тело каждого сообщения (вы, возможно, можете опустить метод выбора, поскольку извлечение тела является выбором по умолчанию), собирает их в список и сохраняет агрегацию в теле сообщения.

Чтобы снова разделить эту совокупность, должно быть достаточно простого split(body()).

ИЗМЕНИТЬ из-за комментария

Да, вы правы, побочным эффектом моего решения является то, что вы теряете свойства и заголовки исходных сообщений, потому что оно объединяет только тела сообщений.

Что вы хотите сделать, так это разбить список обменов обратно на оригиналы. т.е. разделитель не должен создавать новые биржи, а использовать уже существующие и отбрасывать агрегированную биржу-оболочку.

Насколько я могу видеть в исходный код Splitter, это в настоящее время невозможно:

Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
...
if (part instanceof Message) {
    newExchange.setIn((Message) part);
} else {
    Message in = newExchange.getIn();
    in.setBody(part);
}
person burki    schedule 10.04.2019
comment
Спасибо, но я не думаю, что это сработает для меня, потому что я считаю, что ваша стратегия потеряет свойства исходных неагрегированных обменов, которые требуются библиотекой метрик, которую я использую (микрометр) - person geg; 10.04.2019
comment
Я расширил свой комментарий, к сожалению, то, что вы хотите, кажется невозможным - person burki; 12.04.2019
comment
да, согласен, похоже, он не поддерживается изначально. Я реализовал собственное решение (см. Ниже), которое распадается на почти идентичное состояние - person geg; 12.04.2019

Согласно принятому ответу, он, похоже, не поддерживается изначально.

Этот специальный процессор развернет разделенный обмен (т. Е. Копирует вложенный обмен Message и свойства в корневой обмен). Развернутый обмен будет почти идентичен исходному - он сохранит все неконфликтующие свойства корневого обмена (например, свойства, связанные с Splitter, такие как разделенный индекс и т. Д.)

class ExchangeUnwrapper : Processor {

    override fun process(exchange: Exchange) {
        val wrappedExchange = exchange.`in`.body as Exchange
        ExchangeHelper.copyResultsPreservePattern(exchange, wrappedExchange)
    }
}

// Route.kt

from(...)
.aggregate(...)
.process { /* do things with aggregate */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
  .process(ExchangeUnwrapper())
  .process { /* do something with the original exchange */ }
.end()
person geg    schedule 12.04.2019