Spring Integration DSL - результаты JdbcPollingChannelAdapter не в очереди

Клянусь, у меня это сработало, но когда я смогу вернуться к нему через несколько месяцев (и после обновления до Boot 1.5.9), у меня возникнут проблемы.

Я установил JdbcPollingChannelAdapter, на котором я могу выполнять функцию receive (), но когда я помещаю адаптер в поток, который не делает ничего, кроме очереди, результат адаптера, запуск .receive в очереди всегда возвращает нуль (я Однако в журнале консоли можно увидеть, что выполняется SQL адаптера).

Тесты ниже. Почему я могу получить результаты от адаптера, но не поставить их в очередь? Заранее благодарим вас за любую помощь.

@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureTestDatabase
@JdbcTest
public class JdbcpollingchanneladapterdemoTests {

  @Autowired
  @Qualifier("dataSource")
  DataSource dataSource;

  private static PollableChannel outputQueue;

    @BeforeClass
    public static void setupClass() {
    outputQueue = MessageChannels.queue().get();
        return;
    }

    @Test
    public void Should_HaveQueue() {
        assertThat(outputQueue, instanceOf(QueueChannel.class));
    }

    @Test
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Create Table DEMO (CODE VARCHAR(5));")
  @Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
      statements = "Drop Table DEMO ;")
    public void Should_Not_HaveMessageOnTheQueue_When_No_DemosAreInTheDatabase() {
        Message<?> message = outputQueue.receive(5000);
        assertThat(message, nullValue()) ;
    }

  @Test
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Create Table DEMO (CODE VARCHAR(5));")
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Insert into DEMO (CODE) VALUES ('12345');")
  @Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
      statements = "Drop Table DEMO ;")
  public void Should_HaveMessageOnTheQueue_When_DemosIsInTheDatabase() {
    assertThat(outputQueue, instanceOf(QueueChannel.class));
    Message<?> message = outputQueue.receive(5000);
    assertThat(message, notNullValue());
    assertThat(message.getPayload().toString(), equalTo("15317")) ;
  }

  @Test
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Create Table DEMO (CODE VARCHAR(5));")
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Insert into DEMO (CODE) VALUES ('12345');")
  @Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
      statements = "Drop Table DEMO ;")
  public void get_message_directly_from_adapter() {
    JdbcPollingChannelAdapter adapter =
        new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
    adapter.setRowMapper(new DemoRowMapper());
    adapter.setMaxRowsPerPoll(1);
    Message<?> message = adapter.receive();
    assertThat(message, notNullValue());
  }


  private static class Demo {

    private String demo;

    String getDemo() {
      return demo;
    }

    void setDemo(String value) {
      this.demo = value;
    }

    @Override
    public String toString() {
      return "Demo [value=" + this.demo + "]";
    }
  }

  public static class DemoRowMapper implements RowMapper<Demo> {

    @Override
    public Demo mapRow(ResultSet rs, int rowNum) throws SQLException {
      Demo demo = new Demo();
      demo.setDemo(rs.getString("CODE"));
      return demo;
    }
  }

  @Component
  public static class MyFlowAdapter extends IntegrationFlowAdapter {

    @Autowired
    @Qualifier("dataSource")
    DataSource dataSource;

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {

      JdbcPollingChannelAdapter adapter =
          new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
      adapter.setRowMapper(new DemoRowMapper());
      adapter.setMaxRowsPerPoll(1);

      return from(adapter,
          c -> c.poller(Pollers.fixedRate(1000L, 2000L)
              .maxMessagesPerPoll(1)
              .get()))
          .channel(outputQueue);
    }
  }
}

ИЗМЕНИТЬ. Я упростил его, насколько смог, рефакторинг кода ниже. Тест проходит поток с общим источником сообщения и терпит неудачу в потоке с источником сообщения JdbcPollingChannelAdapter. Мне просто не очевидно, как мне настроить второй источник сообщений, чтобы он работал так же, как первый источник сообщений.

  @Test
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Create Table DEMO (CODE VARCHAR(5));")
  @Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Insert into DEMO (CODE) VALUES ('12345');")
  public void Should_HaveMessageOnTheQueue_When_UnsentDemosIsInTheDatabase() {

this.genericFlowContext.registration(new GenericFlowAdapter()).register();

PollableChannel genericChannel = this.beanFactory.getBean("GenericFlowAdapterOutput",
    PollableChannel.class);

this.jdbcPollingFlowContext.registration(new JdbcPollingFlowAdapter()).register();

PollableChannel jdbcPollingChannel = this.beanFactory.getBean("JdbcPollingFlowAdapterOutput",
    PollableChannel.class);

assertThat(genericChannel.receive(5000).getPayload(), equalTo("15317"));

assertThat(jdbcPollingChannel.receive(5000).getPayload(), equalTo("15317"));
  }

  private static class GenericFlowAdapter extends IntegrationFlowAdapter {

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
  return from(getObjectMessageSource(),
      e -> e.poller(Pollers.fixedRate(100)))
      .channel(c -> c.queue("GenericFlowAdapterOutput"));
}

private MessageSource<Object> getObjectMessageSource() {
  return () -> new GenericMessage<>("15317");
}
}

private static class JdbcPollingFlowAdapter extends IntegrationFlowAdapter {

@Autowired
@Qualifier("dataSource")
DataSource dataSource;

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
  return from(getObjectMessageSource(),
      e -> e.poller(Pollers.fixedRate(100)))
      .channel(c -> c.queue("JdbcPollingFlowAdapterOutput"));
}

private MessageSource<Object> getObjectMessageSource() {
  JdbcPollingChannelAdapter adapter =
      new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
  adapter.setRowMapper(new DemoRowMapper());
  adapter.setMaxRowsPerPoll(1);
  return adapter;
}
  }

person Curtis Olson    schedule 19.01.2018    source источник


Ответы (1)


Похоже, вам нужно добавить @EnableIntegration в вашу тестовую конфигурацию. Когда вы используете для тестирования срезы Spring Boot, загружаются не все автоконфигурации:

https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/reference/htmlsingle/#test-auto-configuration.

ОБНОВЛЕНИЕ

Проблема в том, что JdbcPollingChannelAdapter выполняется в отдельном запланированном потоке, уже вне исходной транзакции, в методе тестирования, где выполняются эти @Sql.

Исправление для вас такое:

@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
      statements = "Insert into DEMO (CODE) VALUES ('12345');",
      config = @SqlConfig(transactionMode = SqlConfig.TransactionMode.ISOLATED))

Обратите внимание на то SqlConfig.TransactionMode.ISOLATED. Таким образом, транзакция INSERT фиксируется, и данные доступны для этого отдельного потока опроса для JdbcPollingChannelAdapter.

Также обратите внимание, что этот JdbcPollingChannelAdapter всегда возвращает List записей. Итак, ваш assertThat(jdbcPollingChannel.receive(5000).getPayload(), ...); должен быть против List<String>, даже если в таблице есть только одна запись.

person Artem Bilan    schedule 19.01.2018
comment
Да вы конечно правы. Спасибо. К сожалению, однако, добавление аннотации оказалось не решением, опрос очереди по-прежнему возвращает нуль в моем случае. - person Curtis Olson; 20.01.2018
comment
OK. Может быть, @ComponentScan тоже должен быть там? Как контекст приложения понимает, что ваш MyFlowAdapter необходимо сканировать, если мы все равно находимся в тестовом срезе? - person Artem Bilan; 20.01.2018
comment
Я вставил правку выше. На данный момент сходство двух потоков заставляет меня поверить, что тестового контекста достаточно. Как отмечалось выше, я максимально упростил его. Тест проходит поток с общим источником сообщения и терпит неудачу в потоке с источником сообщения JdbcPollingChannelAdapter. Мне просто не очевидно, как мне настроить второй источник сообщений, чтобы он работал так же, как первый источник сообщений. - person Curtis Olson; 20.01.2018
comment
Странный. Могу ли я поиграть в простой проект Spring Boot со своей стороны? Спасибо - person Artem Bilan; 20.01.2018
comment
Конечно. Как бы вы предпочли, чтобы я передал его вам? - person Curtis Olson; 20.01.2018
comment
Через GitHub, пожалуйста - person Artem Bilan; 20.01.2018
comment
Спасибо, пожалуйста, найдите ссылку ниже: github.com/gullywompr/ - person Curtis Olson; 20.01.2018
comment
Это был отличный образец для подражания. Спасибо! Теперь, пожалуйста, найдите UPDATE в моем ответе на решение. - person Artem Bilan; 22.01.2018
comment
ОНО РАБОТАЕТ! Артем, спасибо за доброту и терпение. Дай мне знать, смогу ли я вернуть услугу. Ваше здоровье! - person Curtis Olson; 23.01.2018
comment
Прохладный! Достаточно оказанной вами услуги, и это подтверждение того, что функция работает. Ваше здоровье! Увидимся в других вопросах! Или даже не стесняйтесь внести свой вклад в проект! - person Artem Bilan; 23.01.2018