Проблемы взаимодействия Spring Integraton RSocket и Spring RSocket

Я создал новый образец и вставил коды на клиентскую и серверную стороны.

Полные коды можно найти здесь.

Есть 3 версии серверной части.

  • сервер Нет Приложение Spring Boot, использующее Spring Integration RSocket InboundGateway.
  • загрузка сервера. Повторно используйте автоконфигурацию Spring RSocket и создавайте с ServerRSocketConnecter по ServerRSocketMessageHanlder.
  • server-boot-messsagemapping Не используйте Spring Integration, просто используйте автоконфигурацию Spring Boot RSocket и @Controller и @MessageMapping.

Есть 2 версии клиента.

  • клиент, отправка сообщений с помощью Spring Integration Rocket OutboundGateway.
  • клиент-запросчик. Отправляйте сообщения с помощью RSocketRequester, а не используйте Spring Integration.

Режим взаимодействия клиента и сервера - REQUEST_CHANNEL, и сервер подключается через TCP / localhost: 7000.

сервер

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

Класс приложения:

@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
            System.out.println("Press any key to exit.");
            System.in.read();
        } finally {
            System.out.println("Exited.");
        }

    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector() {
        return new ServerRSocketConnector("localhost", 7000);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

загрузка сервера

Зависимости в pom.xml.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

application.properties

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

Класс приложения.

@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(DemoApplication.class, args);
    }

    // see PR: https://github.com/spring-projects/spring-boot/pull/18834
    @Bean
    ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
        var handler = new ServerRSocketMessageHandler(true);
        handler.setRSocketStrategies(rSocketStrategies);
        return handler;
    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
        return new ServerRSocketConnector(serverRSocketMessageHandler);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

отображение сообщений при загрузке сервера

Зависимости в pom.xml.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

Файл application.properties.

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

Класс приложения.

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@Controller
class UpperCaseHandler {

    @MessageMapping("/uppercase")
    public Flux<String> uppercase(Flux<String> input) {
        return input.map(String::toUpperCase);
    }
}

клиент

В клиенте зависимости в pom.xml такие же.


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

Класс приложения:


@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ClientRSocketConnector clientRSocketConnector() {
        ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
        clientRSocketConnector.setAutoStartup(false);
        return clientRSocketConnector;
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
        return IntegrationFlows
                .from(Function.class)
                .handle(RSockets.outboundGateway("/uppercase")
                        .interactionModel((message) -> RSocketInteractionModel.requestChannel)
                        .expectedResponseType("T(java.lang.String)")
                        .clientRSocketConnector(clientRSocketConnector))
                .get();
    }
}

@RestController
class HelloController {

    @Autowired()
    @Lazy
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
    }
}

При запуске клиентского и серверного приложений попробуйте получить доступ к http://localhost:8080/hello через curl.

При использовании server и server-boot, который использует InboundGateway для обработки сообщений, результат выглядит следующим образом.

curl http://localhost:8080/hello

data:ABCD

При использовании server-boot-messagemapping вывод работает, как я и ожидал:

data:A
data:B
data:C
data:D

заказчик

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

Класс приложения:

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@RestController
class HelloController {
    Mono<RSocketRequester> requesterMono;

    public HelloController(RSocketRequester.Builder builder) {
        this.requesterMono = builder.connectTcp("localhost", 7000);
    }

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return requesterMono.flatMapMany(
                rSocketRequester -> rSocketRequester.route("/uppercase")
                        .data(Flux.just("a", "b", "c", "d"))
                        .retrieveFlux(String.class)
        );
    }
}

При запуске этого клиента и 3-х серверов попробуйте получить доступ к http://localhost:8080/hello через curl.

При использовании server и server-boot, в котором для обработки сообщений используется InboundGateway, возникает исключение приведения к классу.

При использовании server-boot-messagemapping вывод работает, как я и ожидал:

data:A
data:B
data:C
data:D

Не знаю, где проблема настройки InboundGateway и OutboundGateway?


person Hantsy    schedule 29.02.2020    source источник


Ответы (1)


Спасибо за такой подробный образец!

Итак, что я вижу. Оба клиента (простой RSocketRequester и Spring Integration) хорошо работают с обычным сервером RSocket.

Чтобы заставить их работать с сервером Spring Integration, вам необходимо внести следующие изменения:

  1. На стороне сервера:

Добавьте .requestElementType(ResolvableType.forClass(String.class)) в определение RSockets.inboundGateway(), чтобы он знал, во что преобразовывать входящие данные.

  1. На стороне клиента:

    .data(Flux.just("a\n", "b\n", "c\n", "d\n")).

В настоящее время серверная часть Spring Integration не обрабатывает входящий Flux как поток независимых данных. Итак, мы пытаемся объединить их все в одно значение. Новый разделитель строк - это индикатор того, что мы ожидаем независимых значений. Spring Messaging со своей стороны делает прямо противоположное: он проверяет multi-value ожидаемый тип и декодирует каждый элемент входящего Flux в своем map() вместо попытки всего Publisher декодирования.

Это будет своего рода критическое изменение, но, возможно, необходимо подумать о том, чтобы исправить RSocketInboundGateway логику, чтобы она соответствовала обычной @MessageMapping поддержке RSocket. Не стесняйтесь поднимать вопрос о GH!

person Artem Bilan    schedule 02.03.2020
comment
Создана проблема spring-projects / spring-integration # 3207 - person Hantsy; 03.03.2020