Введение: параллельная потоковая обработка

Одно из заявленных намерений при разработке Java 8 Streams API заключалось в том, чтобы лучше использовать преимущества многоядерной вычислительной мощности современных компьютеров. Операции, которые могут выполняться с одним линейным потоком значений, также могут выполняться параллельно путем разделения этого потока на несколько подпотоков и объединения результатов обработки каждого подпотока по мере их появления.

Например, предположим, что мы хотим вычислить хэш большого массива значений String. Самый простой способ — использовать Arrays.deepHashCode:

int hash = Arrays.deepHashCode(strings);

но мы также можем собрать хэши всех String в массиве в массив int, а затем взять хэш этого:

int hash = Arrays.hashCode(Arrays.stream(strings).mapToInt(String::hashCode).toArray());

Сделав это, мы можем затем распараллелить операцию хеширования по всем потокам в Java по умолчанию ForkJoinPool, сделав поток параллельным:

int hash = Arrays.hashCode(Arrays.stream(strings).parallel().mapToInt(String::hashCode).toArray());

Вот тест, который использует библиотеку тестирования производительности Contiperf для сбора некоторых показателей по этим трем методам:

public class PerformanceTest {
    @Rule
    public final ContiPerfRule contiperfRule = new ContiPerfRule();
    private static final String[] strings = IntStream.range(0, 1000000)
            .mapToObj(i -> randomString())
            .toArray(String[]::new);
    private static String randomString() {
        Random random = ThreadLocalRandom.current();
        byte[] bytes = new byte[random.nextInt(100) + 1];
        random.nextBytes(bytes);
        return new String(bytes);
    }
    @Test
    @PerfTest(invocations = 10000, warmUp = 10000)
    public void hashWithDeepHashCode() {
        Arrays.deepHashCode(strings);
    }
    @Test
    @PerfTest(invocations = 10000, warmUp = 10000)
    public void hashWithLinearStream() {
        Arrays.hashCode(Arrays.stream(strings).mapToInt(String::hashCode).toArray());
    }
    @Test
    @PerfTest(invocations = 10000, warmUp = 10000)
    public void hashWithParallelStream() {
        Arrays.hashCode(Arrays.stream(strings).parallel().mapToInt(String::hashCode).toArray());
    }
}

и вот результаты для трех подходов:

com.opencredo.eventhose.PerformanceTest.hashWithParallelStream
samples: 8536
max:     19
average: 6.018158388003749
median:  6
com.opencredo.eventhose.PerformanceTest.hashWithDeepHashCode
samples: 9234
max:     29
average: 10.06670998483864
median:  11
com.opencredo.eventhose.PerformanceTest.hashWithLinearStream
samples: 9262
max:     40
average: 12.909090909090908
median:  13

Как и следовало ожидать, линейное преобразование массива Strings в массив ints, а затем его хеширование примерно на 20% медленнее, чем просто использование Arrays.deepHashCode. Но когда мы распараллеливаем хэширование каждого отдельного String, мы видим значительное ускорение, поскольку в игру вступает больше ядер ЦП, каждое из которых обрабатывает подмножество значений в нашем исходном массиве.

Однако мы по-прежнему ограничены ресурсами ЦП и памяти, доступными нам на одной машине. Что, если бы мы могли распараллелить задачи потоковой обработки в кластере машин?

Хейзелкаст Джет

Hazelcast Jet создает два слоя поверх сетки распределенных данных Hazelcast. Первый отображает DAG (направленный ациклический граф), представляющий поток вычислений в сетке, предоставляя универсальную модель обработки для распределенного потока данных. Данные предоставляются источниками, потребляются приемниками и преобразуются процессорами, которые действуют как приемники и источники. Эта модель должна быть знакома всем, кто работал со Spark, Spring XD или Apache Storm, и я не буду здесь ее подробно обсуждать — есть хорошее введение на сайте Hazelcast Jet.

Второй уровень отображает вычисление, определенное с помощью Java 8 Streams API, в DAG. С некоторыми небольшими изменениями мы можем запустить тот же код, что и раньше, и распределить его по кластеру экземпляров Hazelcast:

public class JetTest {
    private static final String[] strings = IntStream.range(0, 1000000)
            .mapToObj(i -> randomString())
            .toArray(String[]::new);
    private static String randomString() {
        Random random = ThreadLocalRandom.current();
        byte[] bytes = new byte[random.nextInt(100) + 1];
        random.nextBytes(bytes);
        return new String(bytes);
    }
    private JetInstance jet1;
    private JetInstance jet2;
    private JetInstance jet3;
    private JetInstance jet4;
    @Before
    public void startJet() {
        jet1 = Jet.newJetInstance();
        jet2 = Jet.newJetInstance();
        jet3 = Jet.newJetInstance();
        jet4 = Jet.newJetInstance();
    }
    @After
    public void stopJet() {
        jet1.shutdown();
        jet2.shutdown();
        jet3.shutdown();
        jet4.shutdown();
    }
    @Test
    public void hashInJet() {
        IStreamList<String> input = jet1.getList("strings");
        Arrays.stream(strings).forEach(input::add);
        int hash = Arrays.hashCode(input.stream()
                .mapToInt(String::hashCode)
                .toArray());
        System.out.println(hash);
    }
}

Возможно, неудивительно, что, учитывая работу, необходимую для координации работы внутри кластера и передачи сериализованных данных и байт-кода на каждый узел, который будет их выполнять, это намного медленнее, чем выполнение того же самого в памяти в рамках одного процесса. Однако возможность повторно использовать известную модель программирования для определения распределенных вычислений очень привлекательна. Давайте дадим ему что-нибудь более интересное, чем хеш-строки.

Воспроизведение событий

Предположим, у нас есть поток событий из многих источников, чередующихся и, возможно, неупорядоченных, и мы хотим преобразовать его в карту упорядоченных по времени историй событий, сгруппированных по источникам событий. Другими словами, учитывая:

At 3pm, Lightbulb 1 was switched on.
At 1pm, Lightbulb 2 was switched off.
At 2pm, Lightbulb 1 was plugged in.
At 11am, Lightbulb 2 was plugged in.
At 10am, Lightbulb 3 was plugged in.
At 12pm, Lightbulb 2 was switched on...

мы хотели бы иметь

Lightbulb 1:
    2pm: plugged in
    3pm: switched on
Lightbulb 2:
    11am: plugged in
    12pm: switched on
    1pm: switched off
Lightbulb 3:
    10am: plugged in...

Это можно выразить с помощью Streams API следующим образом:

Map<AggregateId, SortedSet<Event>> eventHistories = events.stream()
        .collect(groupingBy(
            Event::getAggregateId,
            toCollection(() -> TreeSet<Event>(eventTimestampComparator)));

Имея функцию, которая берет историю событий одной лампочки и возвращает текущее состояние лампочки, мы можем преобразовать наш беспорядочный поток событий во что-то еще более полезное: поиск по ключу/значению, дающий текущее состояние каждой лампочки:

IMap<AggregateId, LightbulbState> eventHistories = events.stream()
        .collect(groupingByToIMap(
            Event::getAggregateId,
            collectingAndThen(
                toCollection(() -> TreeSet<Event>(eventTimestampComparator)),
                eventHistory -> stateModel.getCurrentState(eventHistory));

Обратите внимание на изменение подписи с Map на IMap (распределенная карта ключей/значений Hazelcast), что означает, что результаты будут храниться в карте, распределенной по всей сетке Hazelcast. Это означает, что вместо того, чтобы сериализовать все результаты обратно клиенту и собирать их в локальный Map, Hazelcast может хранить результаты каждого вычисления в том разделе сетки данных, где вычисление было выполнено. Это потенциально может быть очень полезным в системе поиска событий, где мы хотим иметь быстрый кэшированный поиск текущих состояний агрегатов, особенно если мы хотим, чтобы кэш был распределен по сетке серверов.

Поскольку API Streams определяет вычисления как завершающиеся операциями сбора — аналогично этапу «уменьшения» в обработке сопоставления/уменьшения — кажется, что использование Hazelcast Jet с этим API лучше всего подходит для пакетных вычислений над источником потоковых данных, а не для непрерывного потока. операции обработки, такие как вычисление скользящего среднего или обнаружение аномальных закономерностей событий в скользящем временном окне. Однако по первому впечатлению Hazelcast Jet предоставляет знакомый и удобный программный API для мощного механизма распределенных вычислений, который может заменить Spark или Storm в некоторых случаях использования.

Посетите opencredo.com/blogs, чтобы прочитать больше!