Я пробовал некоторые новые функции в Java 9. Итак, я составил тест, чтобы издатель выдавал числа с заданной скоростью. Я также реализовал подписчика, чтобы слушать эти публикации и просто печатать их на консоли.
Хотя я, возможно, не совсем понимаю, как использовать этот Api, потому что метод onNext()
ничего не печатает, а getLastItem()
возвращает только 0.
Единственная часть, которая кажется работающей, - это onSubscribe()
, которая правильно инициализирует переменную lastItem
.
@Test
public void testReactiveStreams(){
//Create Publisher
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//Register Subscriber
TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
publisher.subscribe(subscriber);
assertTrue(publisher.hasSubscribers());
//Publish items
System.out.println("Publishing Items...");
List.of(1,2,3,4,5).stream().forEach(i -> {
publisher.submit(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// blah
}
});
assertEquals(5, subscriber.getLastItem());
publisher.close();
}
private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {
private int lastItem;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed");
lastItem = 0;
}
@Override
public void onNext(Integer item) {
System.out.println("Received : "+item);
lastItem += 1; // expect increment by 1
assertTrue(lastItem == item);
}
@Override
public void onError(Throwable throwable) {
// nothing for the moment
}
@Override
public void onComplete() {
System.out.println("Completed");
}
public int getLastItem(){
return lastItem;
}
}
Кто-нибудь может сказать мне, что я делаю не так в своем тесте, пожалуйста? Я ожидал, что тест напечатает эти числа и вернет 5 в качестве последнего элемента.
Я должен сказать, что использую только Observables и Subjects в Angular2, хотя их легче понять.