Как поддерживать транспорт WebSocket с сервером Spring Boot RSocket?

TL; DR: что требуется для настройки приложения Spring Boot, которое предоставляет интерфейс RSocket, поддерживающий транспорт WebSocket?


Я одновременно изучаю RSocket и Spring Boot, так что, пожалуйста, потерпите меня.

В своей борьбе мне удалось создать очень простую и надуманную реализацию приложения Spring Boot, которое использует API, предоставляемый / предоставляемый вторым приложением Spring Boot, использующим RSocket в качестве протокола, однако я могу достичь этого только тогда, когда используя TcpClientTransport.

С моей точки зрения, WebsocketTransport гораздо более вероятно будет использоваться и более полезен для клиент-> серверных архитектур, однако я не нашел никаких рабочих примеров или документации о том, как правильно настроить приложение Spring Boot, которое принимает сообщения RSocket с помощью WebSocket. как транспорт.

Странно то, что в моих тестах оказалось, что мой потребитель (клиент) действительно устанавливает соединение WebSocket с сервером / производителем, однако «рукопожатие», похоже, зависает, и соединение никогда не устанавливается полностью. Я тестировал как библиотеки JavaScript (rsocket-websocket-client, rsocket-rpc-core и т.д.), так и библиотеки Java (io.rsocket.transport.netty.client.WebsocketClientTransport), и сервер, похоже, демонстрирует то же самое. поведение независимо.

Повторюсь, используя TCPTransport, я могу подключиться к серверу и вызывать запросы, но при использовании WebsocketTransport соединение никогда не устанавливается.

Что требуется от приложения Spring Boot, которое нацелено на поддержку RSocket через WebsocketClientTransport, не потребляя spring-boot-starter-rsocket в качестве зависимости ?.

Сервер


pom.xml

...

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.0.M5</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

...

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-rsocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

...

application.properties

spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*

SpringBootRSocketServerApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootRSocketServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootRSocketServerApplication.class, args);
    }
}

UserRSocketController

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;

@Slf4j
@Controller
public class UserRSocketController {

    @Autowired
    private UserRepository userRepository;

    @MessageMapping("usersList")
    public Mono<List<User>> usersList() {
        log.info("Handling usersList request.");
        return Mono.just(this.userRepository.getUsers());
    }

    @MessageMapping("usersStream")
    Flux<User> usersStream(UserStreamRequest request) {
        log.info("Handling request for usersStream.");
        List<User> users = userRepository.getUsers();
        Stream<User> userStream = Stream.generate(() -> {
            Random rand = new Random();
            return users.get(rand.nextInt(users.size()));
        });
        return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
    }

    @MessageMapping("userById")
    public Mono<User> userById(GetUserByIdRequest request) {
        log.info("Handling request for userById id: {}.", request.getId());
        return Mono.just(this.userRepository.getUserById(request.getId()));
    };
}

Ведение журнала запуска

 :: Spring Boot ::             (v2.2.0.M5)

2019-09-08 21:40:02,986 INFO  [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO  [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO  [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO  [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO  [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO  [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)

Потребитель / Клиент


ClientConfiguration.java

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        // ClientTransport transport = TcpClientTransport.create(8081);
        // ^--- TCPTransport works fine

        ClientTransport transport = WebsocketClientTransport.create(8081);
        // ^--- Connection hangs and application startup stalls

        return RSocketFactory
            .connect()
            .mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(transport)
            .start()
            .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

Ведение журнала запуска

 :: Spring Boot ::             (v2.2.0.M5)

2019-09-08 21:40:52,331 INFO  [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO  [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default

person Vigs    schedule 09.09.2019    source источник


Ответы (2)


Чтобы приложение RSocket отображало конечные точки с помощью транспорта websocket, вам нужно всего две вещи:

Во-первых, вам потребуются зависимости как от webflux, так и от rsocket, так как вам, вероятно, также потребуется обслуживать веб-страницы и статические ресурсы:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-rsocket</artifactId>
    </dependency>

Затем вам необходимо соответствующим образом настроить сервер RSocket в вашем application.properties файле:

#server.port=8080 this is already the default
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket

Подробнее об этом вы найдете в Справочная документация Spring Boot о RSocket.

Теперь клиент websocket может подключиться к ws://localhost:8080/rsocket.

Обратите внимание, что в текущем выпуске SNAPSHOT версии 2.2.0 протокол RSocket претерпел изменения, и библиотека rsocket-js в настоящее время догоняет его, особенно в части поддержки метаданных. Здесь вы также найдете рабочий образец.

Что касается Java-клиента, Spring Boot предоставляет вам RSocketRequester.Builder, который уже настроен и настроен в соответствии с вашими потребностями с помощью кодеков и перехватчиков:

@Component
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder builder) {
        this.rsocketRequester = builder
                .connectWebSocket(URI.create("ws://localhost:8080/rsocket"))
                .block();
    }
}
person Brian Clozel    schedule 10.09.2019

На основании этого сообщения в блоге правильный порт для подключения порт, настроенный через server.port=8080.

введите здесь описание изображения

Конфигурация сервера

server.port=8080
spring.rsocket.server.port=8081
spring.rsocket.server.mapping-path=/ws
spring.rsocket.server.transport=websocket

Конфигурация клиентского клиента Java

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;

import java.net.URI;
import java.time.Duration;

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        URI websocketUri = URI.create("ws://127.0.0.1:8080/ws");
        WebsocketClientTransport ws = WebsocketClientTransport.create(websocketUri);
        return RSocketFactory
            .connect()
            .mimeType(
                MetadataExtractor.ROUTING.toString(),
                MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(ws)
            .start()
            .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(
            rSocket(),
            MimeTypeUtils.APPLICATION_JSON,
            MetadataExtractor.ROUTING,
            rSocketStrategies);
    }
}

Конфигурация клиента JavaScript

import { RSocketClient, JsonSerializers } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';

const transport = new RSocketWebSocketClient({
    url: 'ws://127.0.0.1:8080/ws'
});

const client = new RSocketClient({
    // send/receive JSON objects instead of strings/buffers
    serializers: JsonSerializers,
    setup: {
        // ms btw sending keepalive to server
        keepAlive: 60000,

        // ms timeout if no keepalive response
        lifetime: 180000,

        // format of `data`
        dataMimeType: 'application/json',

        // format of `metadata`
        metadataMimeType: 'application/json',
    },
    transport,
});

client.connect().then((rsocket) => {
    // work with rsocket
});
person Vigs    schedule 10.09.2019