Более конкретный вопрос: почему Flux.fromIterable () не работает с Reactor Netty HttpClient? Этот простой пример отлично работает. Все 10 элементов выпущены издателем Flux:
public class ConcurrentLinkedQueueFluxTest {
public static final void main(String[] args) {
List<Integer> aList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<>();
aList.stream().forEach(i -> clq.add(i));
Flux.fromIterable(clq)
.subscribe(new ReactiveSubscriber<Integer>());
}
}
Используя ReactiveSubscriber:
class ReactiveSubscriber<T> extends BaseSubscriber<T> {
private Subscription subscription;
@Override
public void hookOnSubscribe(Subscription s) {
System.out.println("In hookOnSubscribe");
this.subscription = s;
subscription.request(1);
}
@Override
public void hookOnNext(T response) {
System.out.println("In hookOnNext: "+response.toString());
subscription.request(1);
}
@Override
public void hookOnError(Throwable t) {
System.out.println(t.getLocalizedMessage());
}
@Override
public void hookOnComplete() {
System.out.println("In hookOnComplete");
subscription.cancel();
}
}
Как показано ниже, если я использую аналогичного подписчика с HttpClient.send (Flux.fromIterable ()), выдается только 1 элемент, а не все элементы в очереди. Значит, что-то настроено неправильно, поскольку этот метод Flux.fromIterable (), который создает Flux, не работает с HttpClient.
Что касается собственно производственного кода, я включил определение очереди, метод клиента, подписчика, метод сервера и журнал, который показывает, что только 1 из 5 элементов отправляется с клиента на сервер. Похоже, что даже несмотря на то, что метод HttpClient send () имеет объект Flux, загруженный из очереди, отправляется только один элемент, хотя в очереди 5 элементов.
Элементы для отправки на сервер находятся в очереди типа ByteBuf:
private ConcurrentLinkedQueue<ByteBuf> electionRequestQueue;
public ElectionTransactionRequest() {
electionRequestQueue = new ConcurrentLinkedQueue<ByteBuf>();
}
Клиентский метод:
public void task() {
log.debug("Queue size: "+electionRequestQueue.size());
ElectionTransactionSubscriber etSubscriber = new ElectionTransactionSubscriber();
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
.port(61005)
.protocol(HttpProtocol.HTTP11)
.post()
.uri("/election/transaction")
.send(Flux.fromIterable(electionRequestQueue))
.responseContent()
.aggregate()
.asByteArray()
.subscribe(etSubscriber);
}
Подписчик определяется как:
class ElectionTransactionSubscriber extends BaseSubscriber<byte[]> {
private static final Logger log = LoggerFactory.getLogger(ElectionTransactionSubscriber.class);
private Subscription subscription;
@Override
public void hookOnSubscribe(Subscription s) {
log.debug("In hookOnSubscribe");
this.subscription = s;
subscription.request(1);
}
@Override
public void hookOnNext(byte[] response) {
log.info("In hookOnNext");
subscription.request(1);
}
@Override
public void hookOnError(Throwable t) {
log.error(t.getLocalizedMessage());
}
@Override
public void hookOnComplete() {
log.debug("In hookOnComplete");
subscription.cancel();
}
}
Сторона сервера определяется в методе:
public void start() {
disposableServer =
HttpServer.create()
.host("localhost")
.port(61005)
.protocol(HttpProtocol.HTTP11)
.route(routes ->
routes
.post("/election/transaction",
(request, response) -> response.send(request
.receive()
.aggregate()
.flatMap(aggregatedBody ->
electionTransactionHandler.electionTransactionResponse(aggregatedBody)))))
.bindNow();
disposableServer.onDispose().block();
}
Когда клиент запущен, в очереди 5 элементов, но только один элемент отправляется на сервер, как показано в журнале. В подписчике метод hookOnComplete () вызывается после отправки только 1 элемента от издателя Flux.
2020-12-01 12:03:56,442 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionRequest: Queue size: 5
2020-12-01 12:03:56,539 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnSubscribe
2020-12-01 12:03:56,746 INFO [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnNext
2020-12-01 12:03:56,746 DEBUG [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnComplete