Spring: отправка нескольких сообщений с помощью цикла

Скажем, у меня есть собственный модуль в Spring XD (я использую Spring XD + Spring Integration + hibernate). Модуль в основном получает что-то из БД (скажем, я сохраняю его с помощью объекта гибернации, поэтому я использую объект под названием «DataFromDB»). DataFromDB - это список, затем я получаю каждый элемент из списка и хочу отправить его, используя что-то вроде:

String payload = convertDataFromDBToJson(record);
return MessageBuilder.createMessage(payload, message.getHeaders());
  • Проблема в том, что каждый раз, когда мне нужно отправить сообщение, я должен вернуть сообщение.
  • Поэтому я хотел бы перебрать список DataFromDB и отправить каждый элемент в виде сообщения.

Есть ли способ отправить несколько сообщений?

Редактировать:

Я просто создал небольшой пример в соответствии с комментариями, пытаясь воспроизвести сценарий. Вот что у меня есть:

мой класс трансформатора:

public class TransformerClass {
public Collection<Message<?>> transformerMethod(Message<?> message) {
    List<Message<?>> messages = new ArrayList<Message<?>>();
    messages.add(new GenericMessage<>("foo"));
    messages.add(new GenericMessage<>("bar"));
    return messages;
}

Моя конфигурация xml:

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx.xsd">

    <tx:annotation-driven />
    <int:channel id="input" />

    <bean id="transFormerClass" class="myModule.TransformerClass">
    </bean>

    <int:transformer input-channel="input" output-channel="output" ref="transFormerClass" method="transformerMethod"/>

    <int:channel id="output"/>

</beans>

Мой тестовый класс:

    @ModuleName(value = "someModule", type = ModuleType.processor)
public class TransformerClassTest extends xDTest {
    private String streamName = "myStream";
    private String chainTest = "someModule";

    @SuppressWarnings("unchecked")
    @Test
    public void testPartnerNotification() throws IOException {

        this.chain = SingleNodeProcessingChainSupport.chain(application, streamName, chainTest);
        //Just to send something to the module as a input.
        Message<String> input = MessageBuilder.createMessage("hello world", buildHeaders());
        this.chain.send(input);
        //Receiving a Single message
        Message<String> result = (Message<String>) chain.receive(5000);
        System.out.println("Result: " + result);
    }

    private MessageHeaders buildHeaders() {
        Map<String, Object> hashMap = new HashMap<String,Object>();
        hashMap.put("test", "testing");
        MessageHeaders headers = new MessageHeaders(hashMap);
        return headers;
    }    
}

Выход:

Результат: GenericMessage [payload = [GenericMessage [payload = foo, headers = {timestamp = 1475072951345, id = 7b6c79a2-db85-563e-c238-262a31141456}], GenericMessage [payload = bar, headers = {id713745 = {ide07295 = {timestamp = 147505] -3513-b95e-3a25-4fd3550f2fea}]], заголовки = {timestamp = 1475072951347, id = f90d94c4-e323-70ed-62ee-4b8bce64814d, test = testing}]

Я использую Spring Integration 4.2.2.RELEASE.


person Columb1a    schedule 26.09.2016    source источник
comment
Вы пробовали просто использовать цикл?   -  person chrylis -cautiouslyoptimistic-    schedule 26.09.2016
comment
Пока не честно. Но для меня это не имеет смысла, так как для отправки сообщения я должен вернуть MessageBuilder.createMessage(payload, message.getHeaders()); Следовательно, в этом случае будет отправлено только одно сообщение (тогда только один элемент в списке) t.   -  person Columb1a    schedule 26.09.2016


Ответы (1)


Если вы вернете Collection<Message<?>>, они будут отправлены как отдельные сообщения.

ИЗМЕНИТЬ

@EnableIntegration
@Configuration
public class So39708474Application {

    @Bean
    public DirectChannel input() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel output() {
        return new DirectChannel();
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    public static class Foo {

        @ServiceActivator(inputChannel = "input", outputChannel = "output")
        public Collection<Message<?>> handle(Message<?> in) {
            List<Message<?>> messages = new ArrayList<Message<?>>();
            messages.add(new GenericMessage<>("foo"));
            messages.add(new GenericMessage<>("bar"));
            return messages;
        }

    }

}

и

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = So39708474Application.class)
public class So39708474ApplicationTests {

    @Autowired
    private MessageChannel input;

    @Autowired
    private SubscribableChannel output;

    @Test
    public void contextLoads() {
        AtomicInteger count = new AtomicInteger();
        this.output.subscribe(m -> {
            count.incrementAndGet();
            System.out.println(m);
        });
        this.input.send(new GenericMessage<>("test"));
        assertEquals(2, count.get());
    }

}
person Gary Russell    schedule 26.09.2016
comment
Итак, после того, как я вернул Collection<Message<?>>, я думаю, мне придется создать новый модуль SpringXD, чтобы разделить сообщения, которые я получаю. После этого я смогу делать с каждым из этих сообщений все, что захочу. - person Columb1a; 26.09.2016
comment
Я сбит с толку - вы сказали, что у вас уже есть настраиваемый модуль и вы хотите создать несколько выходных сообщений для одного входного сообщения. Именно это и произойдет; вам не нужен разветвитель. Если вы вернетесь Collection<Payload>, вы это сделаете, но Collection<Message<?>> сообщит фреймворку, что вы хотите отправить несколько выходных данных. - person Gary Russell; 27.09.2016
comment
Ой, Потрясающе!. Большое спасибо за разъяснения. Это именно то, что я искал. Итак, мне просто нужно использовать Collection<Message<?>>, и любой модуль, который получает в качестве входных данных выход моего текущего модуля, получит одно сообщение. - person Columb1a; 27.09.2016
comment
Привет @ Гэри Рассел. Я только что создал тест Junit, чтобы проверить результат, связанный с тем, что я реализовал. Я жестко закодировал свою логику, чтобы заполнить возвращаемую коллекцию двумя сообщениями. Я заметил, что получаю одно сообщение, которое содержит 2 сообщения вместо 2 сообщений по отдельности. Message<String> input = MessageBuilder.createMessage(alertDetail, buildHeaders()); this.chain.send(input); Message<?> result = chain.receive(5000); System.out.println("Result: " + result); - person Columb1a; 27.09.2016
comment
Вам нужно показать код (включая тест) - отредактируйте свой вопрос. Также какую версию Spring Integration вы используете? Я тестировал 4.3, 4.2 и 4.1. Но логика для создания набора сообщений по отдельности существовала практически всегда. - person Gary Russell; 27.09.2016
comment
Мне не нужно видеть никакой логики, мне просто нужно увидеть сигнатуры классов и методов и то, как именно вы их тестируете. И как вы настраиваете приложение (определения XML или @Bean). Я отредактировал свой ответ примером, который показывает 2 вывода сообщения на ввод. - person Gary Russell; 27.09.2016
comment
Я только что добавил пример того, что только что сделал. Когда я выполняю chain.receive(5000), я ожидаю получить одно сообщение, но на самом деле я получаю одно сообщение, а внутри него еще два сообщения, которые в основном представляют собой список, который был отправлен на выход. - person Columb1a; 28.09.2016
comment
Ах - трансформаторы особенные - структура предполагает, что трансформатор берет на себя полную ответственность за то, что выводится. Измените его на <int:service-activator/>, и он будет делать то, что вы хотите. В этом случае вы не преобразуете сообщение, вы вызываете какую-то службу, поэтому активатор службы является правильным элементом для использования. - person Gary Russell; 28.09.2016