Получение элементов из реактивного потока SubmissionPublisher

Я пробовал некоторые новые функции в Java 9. Итак, я составил тест, чтобы издатель выдавал числа с заданной скоростью. Я также реализовал подписчика, чтобы слушать эти публикации и просто печатать их на консоли.

Хотя я, возможно, не совсем понимаю, как использовать этот Api, потому что метод onNext() ничего не печатает, а getLastItem() возвращает только 0.

Единственная часть, которая кажется работающей, - это onSubscribe(), которая правильно инициализирует переменную lastItem.

@Test
public void testReactiveStreams(){
    //Create Publisher
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    //Register Subscriber
    TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
    publisher.subscribe(subscriber);

    assertTrue(publisher.hasSubscribers());

    //Publish items
    System.out.println("Publishing Items...");

    List.of(1,2,3,4,5).stream().forEach(i -> {
        publisher.submit(i);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // blah
        }
    });
    assertEquals(5, subscriber.getLastItem());

    publisher.close();
}


private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {

    private int lastItem;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscribed");
        lastItem = 0;
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received : "+item);
        lastItem += 1; // expect increment by 1
        assertTrue(lastItem == item);
    }

    @Override
    public void onError(Throwable throwable) {
        // nothing for the moment
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
    }

    public int getLastItem(){
        return lastItem;
    }
}

Кто-нибудь может сказать мне, что я делаю не так в своем тесте, пожалуйста? Я ожидал, что тест напечатает эти числа и вернет 5 в качестве последнего элемента.

Я должен сказать, что использую только Observables и Subjects в Angular2, хотя их легче понять.


person nuvio    schedule 14.03.2017    source источник


Ответы (2)


Flow API реализует функцию под названием backpressure (объясняется здесь в контексте RxJava), что означает, что издатель не должен иметь возможности перегружать подписчика, публикуя элементы быстрее, чем он может их обработать. Способ, которым JDK 9 реализует это, заключается в наличии элементов запроса подписчика из подписки.

Для вашего теста TestIntegerSubscriber должен запрашивать элементы onSubscription, скажем 10, и отслеживать, как часто вызывается onNext, чтобы он мог запрашивать больше после того, как 10 элементов были отправлены.

Я написал раздел о Flow API, в котором еще немного подробнее. Он также описывает взаимодействие между издателем, подписчиком и подпиской:

  1. Создайте Publisher и Subscriber.
  2. Подпишитесь подписчика с Publisher::subscribe.
  3. Издатель создает Subscription и вызывает с ним Subscriber::onSubscription, чтобы подписчик мог сохранить подписку.
  4. В какой-то момент подписчик вызывает Subscription::request, чтобы запросить несколько элементов.
  5. Издатель начинает передавать элементы подписчику, вызывая Subscriber::onNext. Он никогда не будет публиковать больше, чем запрошенное количество элементов.
  6. Издатель может в какой-то момент исчерпать свои ресурсы или столкнуться с проблемами и позвонить Subscriber::onComplete или Subscriber::onError соответственно.
  7. Подписчик может либо продолжать время от времени запрашивать дополнительные элементы, либо разорвать соединение, вызвав Subscription::cancel.
person Nicolai Parlog    schedule 15.03.2017

Я провел еще несколько исследований и вижу, что код можно заставить работать, добавив член Subscription в реализацию Subscriber следующим образом:

public class MySubscriber<T> implements Subscriber<T> {  
  private Subscription subscription;  

...

@Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscribed");
            subscription.request(1);
            lastItem = 0;
        }

...

  @Override
        public void onNext(Integer item) {
            System.out.println("Received : "+item);
            lastItem += 1; // expect increment by 1
            assertTrue(lastItem == item);
            subscription.request(1);
        }

где subscription.request(1); я думаю, это означает, что я потребляю 1 товар от издателя. Я не знаю ... на самом деле было бы хорошо, если бы кто-нибудь мне это разъяснил.

person nuvio    schedule 14.03.2017
comment
0 голос против. Вызов метода subscription.request (1) означает, что у подписчика есть место для приема сообщения I от издателя. После этого звонка издатель один раз вызывает subscriber.onNext () и ждет, чтобы подписчик разрешил ему ввести дополнительные данные. - person Alexei Kaigorodov; 25.04.2017