с пружинной загрузкой rsocket захватывает тип отмены кадра

У меня есть реализация rsocket с весенней загрузкой, где, если клиент отменяет или закрывает свой запрос rsocket, я хочу отменить другие регистрации подписки на сервере.

В журналах на сервере весенней загрузки я вижу, что сообщение об отмене отправлено или получено:

WARN i.r.t.n.s.WebsocketServerTransport$1 [reactor-http-nio-3] received WebSocket Close Frame - connection is closing
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

Как мне захватить и обработать этот сигнал отмены?

Я пробовал отменить конечные точки, но они не фиксируют сигнал:

@MessageMapping("cancel")
Flux<Object> onCancel() {
    log.info("Captured cancel signal");
}

or

@ConnectMapping("cancel")
Flux<Object> onCancel2() {
    log.info("Captured cancel2 signal");
}

Этот вопрос по отмене подписок, возможно, связан с этим вопросом, и этот вопрос с обнаружение отключения веб-сокета


person rupweb    schedule 09.03.2020    source источник


Ответы (2)


Для захвата сигнала отмены вы можете использовать подписку на событие onClose().

В вашем контроллере

@Controller
class RSocketConnectionController {

    @ConnectMapping("client-id")
    fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
//        rSocketRequester.rsocket().dispose()   //to reject connection
        rSocketRequester
                .rsocket()
                .onClose()
                .subscribe(null, null, {
                    log.info("{} just disconnected", clientId)

                    //TODO here whatever you want
                })
    }
}

Ваш клиент должен правильно отправить кадр SETUP, чтобы вызвать этот @ConnectMapping. Если вы используете rsocket-js, вам нужно добавить такую ​​полезную нагрузку:

const client = new RSocketClient({
        // send/receive JSON objects instead of strings/buffers
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {
          //for connection mapping on server
          payload: {
            data: 'unique-client-id',   //TODO you can receive this data on server side
            metadata: String.fromCharCode("client-id".length) + "client-id"
          },
          // ms btw sending keepalive to server
          keepAlive: 60000,
.....
        }
});

person kojot    schedule 30.04.2020
comment
Спасибо, я не смог заставить это работать, но ответил на вопрос cancel() ниже. До сих пор не знаю о WebSocket Close Frame - person rupweb; 30.04.2020

Это был не очень хорошо поставленный вопрос. Ответ в том, что

INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

виден FluxSink, который был настроен из исходной @MessageMapping конечной точки.

Например:

@MessageMapping("hello")
Flux<Object> hello(@Payload String message) {       
    return myService.generateWorld(message);
}

В myService классе

public Flux<Object> generateWorld(String hello) {
    EmitterProcessor<Object> emitter = EmitterProcessor.create();
    FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

    // doing stuff with sink here
    sink.next(stuff());

    // This part will handle a cancel from the client
    sink.onCancel(() -> {log.info("********** SINK.onCancel ***********");});

    return Flux.from(emitter));  
}

sink.onCancel() будет обрабатывать отмену потока к конечной точке hello от клиента.

person rupweb    schedule 30.04.2020
comment
В более поздних версиях и Sink.Many() используйте это - person rupweb; 31.03.2021