Потоки Java 25. Сбор 1. Пользовательский сборщик

Терминальная операция либо возвращает одно значение (того же или другого типа, чем тип ввода), либо вообще ничего не возвращает (производит только побочные эффекты). Он не позволяет применить другую операцию после себя и закрывает поток.

В этом посте мы начнем обсуждение последней из терминальных операций, называемой collect():

R collect(Коллектор‹T,A,R› коллектор)

Это специализация операции reduce(). Он позволяет реализовать широкий спектр алгоритмов, используя готовые реализации сборщиков из класса java.util.stream.Collectors. О каждом из них мы поговорим в следующих постах.

Вы также можете реализовать свой собственный сборщик, используя перегруженную версию метода collect():

R collect (Поставщик‹R› поставщик, BiConsumer‹R,T› накопитель, BiConsumer‹R,R› объединитель)

Это то, что мы собираемся обсудить сегодня.

Использование пользовательских сборщиков и сборщиков классов

Для демонстрации мы будем использовать следующий класс Box:

class Box {
      int weight;
      String color;
      public Box(){}
      public Box(int weight, String color) {
        this.weight = weight;
        this.color = color;
      }
      public int getWeight() { return weight; 
      public void setWeight(int weight) { this.weight = weight;}
      public String getColor() { return color; }
      public void setColor(String color) { this.color = color; }
      @Override
      public String toString() {
        return "Box{weight=" + weight +
                ", color='" + color + "'}";
      }
    }

Давайте реализуем сборщик, который находит самый тяжелый из ящиков:

BiConsumer<Box, Box> accumulator = (b1, b2) -> {
      if(b1.getWeight() < b2.getWeight()){
          b1.setWeight(b2.getWeight());
          b1.setColor(b2.getColor());
      }
  };
  BiConsumer<Box, Box> combiner = (b1, b2) -> {
      System.out.print("Combiner is called!");
      if(b1.getWeight() < b2.getWeight()){
          b1.setWeight(b2.getWeight());
          b1.setColor(b2.getColor());
      }
  };
  Box theHeaviest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
        .collect(Box::new, accumulator, combiner);
  System.out.print(theHeaviest);  
                      //prints: Box{weight=8, color='green'}

Результат правильный, но комбайнер не вызывался (не выводилось сообщение «Комбайн вызван!»). Это связано с тем, что объединитель используется только для обработки параллельных потоков, чтобы объединить результаты нескольких подпроцессов, выполняемых параллельно.

Чтобы продемонстрировать это, давайте преобразуем наш поток в параллельный:

Box theHeaviest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
      .parallel()
      .collect(Box::new, accumulator, combiner); 
            //prints: Combiner is called!Combiner is called!
  System.out.print(theHeaviest); 
                      //prints: Box{weight=8, color='green'}

Как видите, теперь комбайнер вызывался дважды (у нас три элемента, значит два нужно объединить с первым).

Если это кажется слишком много, у нас есть хорошие новости для вас. Скорее всего, вам никогда не понадобится создавать собственный сборщик, потому что многие из них могут быть сгенерированы фабричными методами класса Collectors или другими утилитами, которые мы обсудим позже. Мы демонстрируем пользовательский сборщик по двум причинам:

— чтобы вы знали, что это возможно и как это сделать в (очень маловероятном) случае, если вы не найдете готовый к использованию в классе Коллекторы или других утилитах;

— это помогает понять терминологию и то, как работают сборщики, что поможет вам выбрать готовый к использованию.

Например, чтобы выбрать самый тяжелый ящик, мы могли бы использовать сборщик Collectors.maxBy(). (Ради полного раскрытия мы могли бы также использовать операцию max(), описанную в Java streams 22. FindAny, findFirst, max, min, но нам нужно чтобы подчеркнуть это, используя сборщик, сгенерированный одним из фабричных методов класса Collectors).

Ниже приведен пример использования сборщика Collectors.maxBy():

Box theHeaviest = Stream.of(new Box(5, "rhowed"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
      .collect(Collectors.maxBy(Comparator
                         .comparing(Box::getWeight)))
      .orElse(null);
  System.out.print(theHeaviest); 
                //prints: Box{weight=8, color='green'}

Нет гарантии, что элемент max будет найден (например, если поток пуст), поэтому операция возвращает результат, завернутый в объект Optional.

Обратите внимание, что эту реализацию нельзя использовать для поиска максимального элемента в параллельном потоке. Это может быть причиной использования операции max(), описанной в разделе Потоки Java 22. FindAny, findFirst, max, min, или для создания пользовательского коллектора. Также вполне может быть, что некоторые утилиты в Java JDK или сторонней библиотеке также имеют готовый к использованию сборщик.

Смысл в том, что прежде чем внедрять свой сборщик, попробуйте найти уже существующий. Вероятно, это было бы проще и надежнее.

Потенциальная проблема при реализации пользовательского сборщика для поиска минимального значения

На первый взгляд кажется, что кастомный сборщик для поиска самого легкого ящика будет выглядеть так — просто перевернув сравнение в аккумуляторе и объединителе:

BiConsumer<Box, Box> accumulatorMin = (b1, b2) -> {
     if(b1.getWeight() > b2.getWeight()){
         b1.setWeight(b2.getWeight());
         b1.setColor(b2.getColor());
     }
  };
  BiConsumer<Box, Box> combinerMin = (b1, b2) -> {
     System.out.print("Combiner is called!");
     if(b1.getWeight() > b2.getWeight()){
           b1.setWeight(b2.getWeight());
           b1.setColor(b2.getColor());
     }
  };
  Box theLightest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
     .collect(Box::new, accumulatorMin, combinerMin);
  System.out.print(theLightest); 
                  //prints: Box{weight=0, color='null'}

Это не работает! Почему? По-видимому, это связано с тем, что самое первое сравнение выполняется между объектом new Box() и одним из объектов Box в потоке. Объект new Box() имеет вес 0 и цвет null по умолчанию, поэтому ни один из объектов потока не имеет меньшего веса, поэтому результат.

Чтобы доказать это, давайте изменим реализацию Supplier (первый параметр) на следующую:

Box theLightest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
     .collect(() -> new Box(10, "whatever"), 
                          accumulatorMin, combinerMin);
  System.out.print(theLightest);  
                 //prints: Box{weight=3, color='blue'}

Намного лучше, не так ли? Единственная проблема в том, что для этого требуются расширенные знания о самом большом весе. Мы могли бы установить для него значение max int, и решение сработало бы.

Но ниже приведена более чистая реализация:

BiConsumer<Box, Box> accumulatorMin2 = (b1, b2) -> {
      if(b1.getWeight() == 0 || 
                  b1.getWeight() > b2.getWeight()){
          b1.setWeight(b2.getWeight());
          b1.setColor(b2.getColor());
      }
  };
  BiConsumer<Box, Box> combinerMin2 = (b1, b2) -> {
      if(b1.getWeight() == 0 || 
                b1.getWeight() > b2.getWeight()){
          b1.setWeight(b2.getWeight());
          b1.setColor(b2.getColor());
      }
  };
  Box theLightest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
     .collect(Box::new, accumulatorMin2, combinerMin2); 
  System.out.print(theLightest);                  
                 //prints: Box{weight=3, color='blue'}

Обратите внимание, как мы добавили проверку b1.getWeight() == 0 к накопителю и объединителю. Это позволило нам вернуться к new Box() в качестве значения, созданного Supplier.

Итак, это пример ловушки, в которую можно иногда попасть, предполагая, что симметричное решение может быть достигнуто простым переворачиванием сравнения или аналогичным простым изменением.

Чтобы защитить себя от таких ловушек, используйте модульное тестирование. О тестировании, его преимуществах и недостатках мы поговорим позже.

И, наконец, посмотрите, насколько проще и менее подвержена ошибкам реализация той же функциональности при использовании готового сборщика:

Box theLightest = Stream.of(new Box(5, "red"),
                            new Box(8, "green"),
                            new Box(3, "blue"))
     .collect(Collectors.
          minBy(Comparator.comparing(Box::getWeight))) 
     .orElse(null);
  System.out.print(theLightest);                     
                 //prints: Box{weight=3, color='blue'}

Как мы упоминали выше, эта реализация не позволяет обрабатывать параллельные потоки. Но на практике большинство разработчиков никогда не сталкиваются с ситуацией, когда нужно обработать параллельный поток.

В следующем посте мы продолжим обсуждение операции collect() и продемонстрируем использование готовых к использованию Collectors.minBy() и Collectors. .maxBy() более подробно.

Смотрите другие сообщения о потоках Java 8 и сообщения на другие темы.