Один поток и несколько подписчиков

Я тестирую воду с реактивными потоками Java9 и RxJava2. У меня действительно нет предпочтений, но я ищу некоторые рекомендации, возможно ли это.

  1. Я создаю предварительно настроенное количество подписчиков, например:

    for(int i = 0; i<MAX_SUBSCRIBERS; i++) {  
         System.out.println("Creating subscriber: " + i);  
         publisher.subscribe(new MySubscriber<>(i + "-subscriber"));   
    }
    
  2. Я читаю список файлов из каталога для одновременной загрузки в какую-то стороннюю систему.

    Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files"));
    paths
    .filter((Files::isRegularFile))
    .forEach(pathName -> publisher.submit(pathName.toString()));
    

Я получаю следующий вывод:

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    0-subscriber: /my/dir/with/files/test1.txt received in onNext
    1-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

В идеале мы должны увидеть следующее поведение. Каждый подписчик должен выполнять работу над уникальным файлом.

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

Это возможно? Любые советы будут потрясающими!


person massnerder    schedule 09.01.2018    source источник


Ответы (3)


Java 9 Flow API состоит из 4 интерфейсов и класса SubmissionPublisher, который отправляет каждое отправленное значение всем своим Subscriber. В настоящее время нет инструментов JDK для поддержки вашего потока данных.

Напротив, RxJava — это богатая библиотека с сотнями операторов, где вы можете выполнять параллельную обработку без дублирования:

    ParallelFlowable<Path> pf = 
            Flowable.<Path, Stream<Path>>using(
                () -> Files.list(Paths.get("/my/dir/with/files")),
                files -> Flowable.fromIterable((Iterable<Path>)() -> files.iterator()),
                AutoCloseable::close
            )
            .parallel(2)
            .runOn(Schedulers.computation())
            .filter(Files::isRegularFile);

pf.subscribe(new Subscriber[] {
    new MySubscriber<>("0-subscriber"),
    new MySubscriber<>("1-subscriber"),
});
person akarnokd    schedule 09.01.2018

Это был комментарий, но он стал слишком длинным. Однако это не настоящий ответ, потому что я не эксперт по реактивным потокам. Это скорее пища для размышлений. ????

Насколько я понимаю, каждый подписчик видит все опубликованные элементы и что подписчики должны быть независимы друг от друга (что, я бы сказал, исключает явную координацию). Если между файлами есть существенная разница (скажем, один из них — PDF, а другой — TXT), то подписчики могут принять решение действовать только с теми типами, для которых они были созданы, но в противном случае каждый должен обрабатывать каждый элемент.

Похоже, вы пытаетесь распределить рабочую нагрузку между несколькими подписчиками, которые, как я предполагаю, работают в разных потоках. Это определенно то, с чем существующие конструкции параллелизма справляются очень хорошо. Взгляните на ExecutorService, Например.

Тем не менее, если вы строите более крупный потоковый конвейер, я не вижу аргументов против инкапсуляции части распределения обработки файлов между потоками в одном подписчике. Это может быть даже сам издатель, публикующий результат обработки каждого файла, как только это будет сделано.

Последнее предостережение: возможно, у RxJava есть что-то в рукаве для этого конкретного случая использования. Мне любопытно прочитать другие ответы.

person Nicolai Parlog    schedule 09.01.2018

Издатели могут быть двух видов: многоадресные и одноадресные. Издатели многоадресной рассылки передают каждому подписчику полный набор сообщений, в то время как издатели одноадресной рассылки направляют каждое сообщение одному подписчику. SubmissionPublisher запрограммирован как мультикаст, сказано в его документации.

Реализации одноадресных издателей можно найти в моей библиотеке DF4J. Ищите реализации интерфейса org.df4j.protocol.Flow.Publisher, который расширяет org.reactivestreams.Publisher

person Alexei Kaigorodov    schedule 04.03.2020