Примеры использования ReactorNettyWebSocketClient

В New Spring есть пример WebSocketClient на Документация Spring.

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000);

Но очень коротко и непонятно:

  1. Как отправить сообщение на сервер (подписаться на канал)?
  2. Затем обрабатывать входящий поток и отправлять сообщения Flux?
  3. Переподключиться к серверу при обрыве соединения?

Может ли кто-нибудь привести более сложный пример?

УПД. Я попытался сделать что-то вроде:

public Flux<String> getStreaming() {

    WebSocketClient client = new ReactorNettyWebSocketClient();
    EmitterProcessor<String> output = EmitterProcessor.create();
    Flux<String> input = Flux.just("{ event: 'subscribe', channel: 'examplpe' }");

    Mono<Void> sessionMono = client.execute(URI.create("ws://api.example.com/"),
            session -> session
                    .send(input.map(session::textMessage))
                    .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then())
                    .then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}

Но это возвращает только одно сообщение. Как будто у меня не было подписки.


person Aleksey Kozel    schedule 17.11.2017    source источник


Ответы (2)


Я предполагаю, что вы используете службу «эхо». Чтобы получить некоторые сообщения от службы, вы должны отправить их в веб-сокет, и служба «отзовет» их вам.

В вашем примере кода вы пишете только один элемент в веб-сокет. Как только вы отправите больше сообщений в сокет, вы получите больше обратно.

Я адаптировал код для подключения к ws://echo.websocket.org вместо локальной службы. Когда вы переходите к /stream, вы видите, что каждую секунду появляется новое сообщение.

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreaming() throws URISyntaxException {

    Flux<String> input = Flux.<String>generate(sink -> sink.next(String.format("{ message: 'got message', date: '%s' }", new Date())))
        .delayElements(Duration.ofSeconds(1));

    WebSocketClient client = new ReactorNettyWebSocketClient();
    EmitterProcessor<String> output = EmitterProcessor.create();

    Mono<Void> sessionMono = client.execute(URI.create("ws://echo.websocket.org"), session -> session.send(input.map(session::textMessage))
        .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then()).then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}

Надеюсь это поможет...

person crixx    schedule 14.01.2018

Ссылка на документацию выше относится к временным документам до выпуска Spring Framework 5. В настоящее время ссылка предоставляет дополнительную информацию. о реализации WebSocketHandler.

person Rossen Stoyanchev    schedule 19.11.2018