TL; DR: что требуется для настройки приложения Spring Boot, которое предоставляет интерфейс RSocket, поддерживающий транспорт WebSocket?
Я одновременно изучаю RSocket и Spring Boot, так что, пожалуйста, потерпите меня.
В своей борьбе мне удалось создать очень простую и надуманную реализацию приложения Spring Boot, которое использует API, предоставляемый / предоставляемый вторым приложением Spring Boot, использующим RSocket в качестве протокола, однако я могу достичь этого только тогда, когда используя TcpClientTransport
.
С моей точки зрения, WebsocketTransport
гораздо более вероятно будет использоваться и более полезен для клиент-> серверных архитектур, однако я не нашел никаких рабочих примеров или документации о том, как правильно настроить приложение Spring Boot, которое принимает сообщения RSocket с помощью WebSocket. как транспорт.
Странно то, что в моих тестах оказалось, что мой потребитель (клиент) действительно устанавливает соединение WebSocket с сервером / производителем, однако «рукопожатие», похоже, зависает, и соединение никогда не устанавливается полностью. Я тестировал как библиотеки JavaScript (rsocket-websocket-client, rsocket-rpc-core и т.д.), так и библиотеки Java (io.rsocket.transport.netty.client.WebsocketClientTransport), и сервер, похоже, демонстрирует то же самое. поведение независимо.
Повторюсь, используя TCPTransport, я могу подключиться к серверу и вызывать запросы, но при использовании WebsocketTransport
соединение никогда не устанавливается.
Что требуется от приложения Spring Boot, которое нацелено на поддержку RSocket через WebsocketClientTransport
, не потребляя spring-boot-starter-rsocket
в качестве зависимости ?.
Сервер
pom.xml
...
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.M5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
...
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
...
application.properties
spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*
SpringBootRSocketServerApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootRSocketServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRSocketServerApplication.class, args);
}
}
UserRSocketController
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
@Slf4j
@Controller
public class UserRSocketController {
@Autowired
private UserRepository userRepository;
@MessageMapping("usersList")
public Mono<List<User>> usersList() {
log.info("Handling usersList request.");
return Mono.just(this.userRepository.getUsers());
}
@MessageMapping("usersStream")
Flux<User> usersStream(UserStreamRequest request) {
log.info("Handling request for usersStream.");
List<User> users = userRepository.getUsers();
Stream<User> userStream = Stream.generate(() -> {
Random rand = new Random();
return users.get(rand.nextInt(users.size()));
});
return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
}
@MessageMapping("userById")
public Mono<User> userById(GetUserByIdRequest request) {
log.info("Handling request for userById id: {}.", request.getId());
return Mono.just(this.userRepository.getUserById(request.getId()));
};
}
Ведение журнала запуска
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:02,986 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)
Потребитель / Клиент
ClientConfiguration.java
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
// ClientTransport transport = TcpClientTransport.create(8081);
// ^--- TCPTransport works fine
ClientTransport transport = WebsocketClientTransport.create(8081);
// ^--- Connection hangs and application startup stalls
return RSocketFactory
.connect()
.mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(transport)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
Ведение журнала запуска
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:52,331 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default