Получить сообщение от Kafka, отправить в Rsocket и получить его от клиента React

Я пытаюсь отправить данные из kafka с помощью облачного потока Spring в Rsocket, а затем представить данные в React

Вот моя конфигурация.

@Configuration
public class RsocketConsumerConfiguration {
    
    @Bean
    public Sinks.Many<Data> sender(){
        return Sinks.many().multicast().directBestEffort();
    }
    

}

@Controller общедоступный класс ServerController {

@Autowired
private Sinks.Many<Data> integer;

@MessageMapping("integer")
public Flux<Data> integer() {
    return  integer.asFlux();
}
@EnableBinding(IClientProcessor.class)
public class Listener {

    @Autowired
    private Sinks.Many<Data> integer;

    @StreamListener(IClientProcessor.INTEGER)
    public void integer(Data val) {
        System.out.println(val);
        integer.tryEmitNext(val);
    }

}

   let  client = new RSocketClient({
    transport: new RSocketWebSocketClient(
        {
            url: 'ws://localhost:7000/ws',
            wsCreator: (url) => new WebSocket(url),
            debug: true,
        },
        BufferEncoders,
    ),
    setup: {
        dataMimeType: "application/json",
        metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
        keepAlive: 5000,
        lifetime: 60000,
    },
});

  client
            .then(rsocket => {
                console.log("Connected to rsocket");
                rsocket.requestStream({
                    metadata: Buffer.from(encodeCompositeMetadata([
                        [MESSAGE_RSOCKET_ROUTING, encodeRoute("integer")],
                    ])),
                 
                })
                    .subscribe({
                        onSubscribe: s => {
                            s.request(2147483647)
                        },
                        onNext: (p) => {
                            let newData = {
                                time: new Date(JSON.parse(p.data).time).getUTCSeconds(),
                                integer: JSON.parse(p.data).integer
                            }
                           newData.integer >100?setInteger(currentData => [newData, ...currentData]):setInt(currentData => [newData, ...currentData])
                           console.log(newData)
                        },
                        onError: (e) => console.error(e),
                        onComplete: () => console.log("Done")
                    });

spring.cloud.stream.bindings.integer.destination = integer Невозможно увидеть это в приложении реакции. Пожалуйста, порекомендуйте. Что я делаю не так?


person meuhedet meuhedet    schedule 27.12.2020    source источник
comment
Удачи с этим?   -  person Yuri Schimke    schedule 30.12.2020
comment
обновленный вопрос с ответом   -  person meuhedet meuhedet    schedule 30.12.2020
comment
@meuhedetmeuhedet: не обновляйте текст вопроса решением! Лучше, если вы разместите решение как ответ на свой вопрос! Вы также можете отметить свой ответ как принятый. Так читателям легко увидеть, что проблема решена. Также читатели могут проголосовать за ваш ответ. :)   -  person Lii    schedule 30.12.2020
comment
@Lii много нового кода. куда мне его положить?   -  person meuhedet meuhedet    schedule 30.12.2020
comment
@meuhedetmeuhedet. Я не понимаю ... Разве вы не можете поместить код в блоки кода в текст ответа?   -  person Lii    schedule 30.12.2020
comment
будет долго и некрасиво   -  person meuhedet meuhedet    schedule 30.12.2020
comment
Аспект вопросов и ответов - критическая точка stackoverflow. Без этого он просто просит случайных незнакомцев в Интернете отладить вашу программу за вас.   -  person Yuri Schimke    schedule 30.12.2020
comment
Я имею в виду, наверное, что это такое. Просить случайного незнакомца отладить ваш код. Организованным способом   -  person meuhedet meuhedet    schedule 30.12.2020


Ответы (1)


Учитывая, что данные, похоже, идут напрямую от Kafka (через Spring) к клиенту, возможно, имеет смысл использовать потоковая передача сообщений Kafka через брокера обмена сообщениями в Интернете клиентам, подключенным к Интернету, через WebSockets.

Раскрытие информации: я не являюсь автором этой статьи, но работаю в той компании, где работает автор. Мы часто видим этот вариант использования, поэтому ожидайте, что этот подход может быть полезным.

person Matthew O'Riordan    schedule 25.01.2021