Как можно настроить клиент с помощью Reactor Netty HttpClient для отправки более одного элемента с помощью издателя Flux на сервер?

Более конкретный вопрос: почему Flux.fromIterable () не работает с Reactor Netty HttpClient? Этот простой пример отлично работает. Все 10 элементов выпущены издателем Flux:

public class ConcurrentLinkedQueueFluxTest {

    public static final void main(String[] args) {

        List<Integer> aList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<>();
        aList.stream().forEach(i -> clq.add(i));

        Flux.fromIterable(clq)
                .subscribe(new ReactiveSubscriber<Integer>());
    }
}

Используя ReactiveSubscriber:

class ReactiveSubscriber<T> extends BaseSubscriber<T> {

    private Subscription subscription;

    @Override
    public void hookOnSubscribe(Subscription s) {
        System.out.println("In hookOnSubscribe");
        this.subscription = s;
        subscription.request(1);
    }

    @Override
    public void hookOnNext(T response) {
        System.out.println("In hookOnNext: "+response.toString());
        subscription.request(1);
    }

    @Override
    public void hookOnError(Throwable t) {
        System.out.println(t.getLocalizedMessage());
    }

    @Override
    public void hookOnComplete() {
        System.out.println("In hookOnComplete");
        subscription.cancel();
    }
}

Как показано ниже, если я использую аналогичного подписчика с HttpClient.send (Flux.fromIterable ()), выдается только 1 элемент, а не все элементы в очереди. Значит, что-то настроено неправильно, поскольку этот метод Flux.fromIterable (), который создает Flux, не работает с HttpClient.

Что касается собственно производственного кода, я включил определение очереди, метод клиента, подписчика, метод сервера и журнал, который показывает, что только 1 из 5 элементов отправляется с клиента на сервер. Похоже, что даже несмотря на то, что метод HttpClient send () имеет объект Flux, загруженный из очереди, отправляется только один элемент, хотя в очереди 5 элементов.

Элементы для отправки на сервер находятся в очереди типа ByteBuf:


    private ConcurrentLinkedQueue<ByteBuf> electionRequestQueue;

    public ElectionTransactionRequest() {
        electionRequestQueue = new ConcurrentLinkedQueue<ByteBuf>();
    }

Клиентский метод:


     public void task() {

        log.debug("Queue size: "+electionRequestQueue.size());

        ElectionTransactionSubscriber etSubscriber = new ElectionTransactionSubscriber();

        HttpClient.create()
             .tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
             .port(61005)
             .protocol(HttpProtocol.HTTP11)
             .post()
             .uri("/election/transaction")
             .send(Flux.fromIterable(electionRequestQueue))
             .responseContent()
             .aggregate()
             .asByteArray()
             .subscribe(etSubscriber);
     }

Подписчик определяется как:



class ElectionTransactionSubscriber extends BaseSubscriber<byte[]> {

    private static final Logger log = LoggerFactory.getLogger(ElectionTransactionSubscriber.class);

    private Subscription subscription;

    @Override
    public void hookOnSubscribe(Subscription s) {
        log.debug("In hookOnSubscribe");
        this.subscription = s;
        subscription.request(1);
    }

    @Override
    public void hookOnNext(byte[] response) {
        log.info("In hookOnNext");
        subscription.request(1);
    }

    @Override
    public void hookOnError(Throwable t) {
        log.error(t.getLocalizedMessage());
    }

    @Override
    public void hookOnComplete() {
        log.debug("In hookOnComplete");
        subscription.cancel();
    }
}

Сторона сервера определяется в методе:


    public void start() {

        disposableServer =
            HttpServer.create()
                .host("localhost")
                .port(61005)
                .protocol(HttpProtocol.HTTP11)
                .route(routes ->
                    routes
                        .post("/election/transaction",
                            (request, response) -> response.send(request
                                                                .receive()
                                                                .aggregate()
                                                                .flatMap(aggregatedBody ->
                                                                    electionTransactionHandler.electionTransactionResponse(aggregatedBody)))))
                        .bindNow();
        disposableServer.onDispose().block();

    }

Когда клиент запущен, в очереди 5 элементов, но только один элемент отправляется на сервер, как показано в журнале. В подписчике метод hookOnComplete () вызывается после отправки только 1 элемента от издателя Flux.


2020-12-01 12:03:56,442 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionRequest: Queue size: 5

2020-12-01 12:03:56,539 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnSubscribe

2020-12-01 12:03:56,746 INFO  [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnNext

2020-12-01 12:03:56,746 DEBUG [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnComplete


person Chris Grimes    schedule 01.12.2020    source источник
comment
Вы нашли способ?   -  person Rémi Roy    schedule 19.02.2021


Ответы (1)


Вам, вероятно, будет проще всего использовать буферизацию. Он испускает буферы (которые по умолчанию являются Collection - List). Итак, в вашем случае подписчик получит список

StepVerifier.create(
    Flux.range(1, 10)
        .buffer(5, 3) //overlapping buffers
    )
        .expectNext(Arrays.asList(1, 2, 3, 4, 5))
        .expectNext(Arrays.asList(4, 5, 6, 7, 8))
        .expectNext(Arrays.asList(7, 8, 9, 10))
        .expectNext(Collections.singletonList(10))
        .verifyComplete();

=========================

Это прямо из документации для активной зоны реактора:

Три вида дозирования

Когда у вас много элементов и вы хотите разделить их на партии, у вас есть три широких решения в Reactor: группировка, управление окнами и буферизация. Эти три концептуально близки, потому что они перераспределяют поток в агрегат. Группирование и создание окон создают поток ‹Flux›, а агрегаты буферизируются в коллекцию. Группировка с помощью Flux ‹GroupedFlux›

Группировка - это процесс разделения исходного потока на несколько пакетов, каждая из которых соответствует ключу.

Связанный оператор - groupBy.

Каждая группа представлена ​​как GroupedFlux, который позволяет получить ключ, вызвав его метод key ().

Нет необходимой преемственности в содержании групп. Как только исходный элемент создает новый ключ, группа для этого ключа открывается, и элементы, соответствующие этому ключу, попадают в группу (несколько групп могут быть открыты одновременно).

Это означает, что группы:

Are always disjoint (a source element belongs to one and only one group).

Can contain elements from different places in the original sequence.

Are never empty.

В следующем примере значения группируются по четности или нечетности:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .groupBy(i -> i % 2 == 0 ? "even" : "odd")
        .concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
                .map(String::valueOf) //map to string
                .startWith(g.key())) //start with the group's key
    )
    .expectNext("odd", "1", "3", "5", "11", "13")
    .expectNext("even", "2", "4", "6", "12")
    .verifyComplete();

Предупреждение Группировка лучше всего подходит, когда у вас среднее или небольшое количество групп. Группы также должны обязательно потребляться (например, с помощью flatMap), чтобы groupBy продолжал получать данные из восходящего потока и кормить больше групп. Иногда эти два ограничения умножаются и приводят к зависанию, например, когда у вас высокая мощность и параллелизм flatMap, потребляющего группы, слишком низок. Создание окон с помощью Flux ‹Flux›

Создание окон - это процесс разделения исходного потока Flux на окна по критериям размера, времени, предикатов, определяющих границы, или определяющих границы Publisher.

Связанные операторы: window, windowTimeout, windowUntil, windowWhile и windowWhen.

В отличие от groupBy, которая случайным образом перекрывается в соответствии с входящими ключами, окна (большую часть времени) открываются последовательно.

Однако некоторые варианты все еще могут пересекаться. Например, в окне (int maxSize, int skip) параметр maxSize - это количество элементов, после которых окно закрывается, а параметр skip - это количество элементов в источнике, после которого открывается новое окно. Таким образом, если maxSize ›пропустить, новое окно открывается раньше, чем закрывается предыдущее, и два окна перекрываются.

В следующем примере показаны перекрывающиеся окна:

StepVerifier.create(
    Flux.range(1, 10)
        .window(5, 3) //overlapping windows
        .concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
    )
        .expectNext(1, 2, 3, 4, 5)
        .expectNext(4, 5, 6, 7, 8)
        .expectNext(7, 8, 9, 10)
        .expectNext(10)
        .verifyComplete();

Примечание. При обратной конфигурации (maxSize ‹skip) некоторые элементы из источника удаляются и не являются частью какого-либо окна.

В случае работы с окнами на основе предикатов через windowUntil и windowWhile наличие последующих исходных элементов, не соответствующих предикату, также может привести к пустым окнам, как показано в следующем примере:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .windowWhile(i -> i % 2 == 0)
        .concatMap(g -> g.defaultIfEmpty(-1))
    )
        .expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
        .expectNext(2, 4, 6) // triggered by 11
        .expectNext(12) // triggered by 13
        // however, no empty completion window is emitted (would contain extra matching elements)
        .verifyComplete();

Буферизация с помощью Flux ‹List›

Буферизация похожа на работу с окнами со следующей особенностью: вместо того, чтобы генерировать окна (каждое из которых является потоком), она генерирует буферы (которые являются коллекцией - по умолчанию - списком).

Операторы для буферизации отражают операторы для работы с окнами: buffer, bufferTimeout, bufferUntil, bufferWhile и bufferWhen.

Когда соответствующий оконный оператор открывает окно, оператор буферизации создает новую коллекцию и начинает добавлять в нее элементы. Когда окно закрывается, оператор буферизации испускает коллекцию.

Буферизация также может привести к отбрасыванию исходных элементов или перекрытию буферов, как показано в следующем примере:

StepVerifier.create(
    Flux.range(1, 10)
        .buffer(5, 3) //overlapping buffers
    )
        .expectNext(Arrays.asList(1, 2, 3, 4, 5))
        .expectNext(Arrays.asList(4, 5, 6, 7, 8))
        .expectNext(Arrays.asList(7, 8, 9, 10))
        .expectNext(Collections.singletonList(10))
        .verifyComplete();

В отличие от работы с окнами, bufferUntil и bufferWhile не генерируют пустой буфер, как показано в следующем примере:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .bufferWhile(i -> i % 2 == 0)
    )
    .expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
    .expectNext(Collections.singletonList(12)) // triggered by 13
    .verifyComplete();

https://github.com/reactor/reactor-core/blob/master/docs/asciidoc/advancedFeatures.adoc

person Susan Mustafa    schedule 01.12.2020
comment
Спасибо за ответ, я отредактировал вопрос, чтобы он был более конкретным. Я не думаю, что buffer () исправит проблему. Настоящая проблема заключается в том, что статический метод Flux.fromIterable () не работает с HttpClient. В добавленном мною простом примере, который не использует HttpClient, Flux.fromIterable () работает нормально, когда загруженная очередь передается методу. Я предполагаю, что что-то настроено неправильно, поскольку Flux.fromIterable () не работает с HttpClient. Вы можете помочь мне исправить мою настройку? - person Chris Grimes; 02.12.2020
comment
@ChrisGrimes, извини, я не могу ответить, почему это не работает. Глядя на официальную документацию, похоже, что они используют HttpClient.create () .post () .uri (example.com) .send (Flux.just (bb1, bb2, bb3)) .responseSingle ((res, content) - ›Mono.just (res.status (). code ())) .block (); HttpClient.create () .baseUri (example.com) .post () .send (ByteBufFlux.fromString (flux)) .responseSingle ((res, content) - ›Mono.just (res.status (). code ())) .block (); - person Susan Mustafa; 02.12.2020
comment
@ChrisGrimeshttps: //projectreactor.io/docs/netty/0.8.4.RELEASE/api/reactor/netty/http/client/HttpClient.html Они используют Flux.just и Flux.fromString - person Susan Mustafa; 02.12.2020