Hazelcast/Coherence Grid Computing EntryProcessor с данными для каждого ключа

Я хочу использовать hazelcast или coherence EntryProcessor для обработки некоторой логики при параллельном выполнении на разных узлах, где ключи хранятся в кеше. Я вижу, что могу использовать что-то вроде sendToEachKey (процесс EntryProcessor).

Моя проблема возникает, когда мне нужно отправить также с логикой часть данных для обработки, которая принадлежит другой системе, и я получаю ее (например, в HTTP-запросе).

Конечно, я могу сделать что-то вроде sendToEachKey (процесс EntryProcessor (data)). Но если данные разные для каждого ключа, и я хочу отправить на конкретный ключ только его данные для обработки, как я могу это сделать? Почему я хочу это сделать, так это потому, что данные слишком велики, и у меня перегрузка сети.

Конечно, если я открою пул потоков для отправки всех данных каждому ключу, это возможно, но это неэффективно из-за огромных запросов.

Спасибо!


person Patrick    schedule 18.10.2016    source источник


Ответы (3)


Для Hazelcast вы можете получить все значения и отправить каждому ключу свой собственный EntryProcessor, однако это создаст много накладных расходов.

Другой вариант — использовать комбинацию EntryProcessor и нашего распределенного ExecutorService.

Вы отправляете Runnable в ExecutorService. Внутри Runnable вы извлекаете локальный набор ключей, извлекаете все внешние значения (все, что уже является локальным для узла), а затем выдаете один EntryProcessor для каждого локального ключа. Поскольку вы уже находитесь локально на узле, трафик больше не летает (кроме резервных копий, разумеется :)). Тем не менее, вы можете захотеть реализовать определенный EntryProcessor, который передает только измененное значение, но не весь процессор (чтобы сэкономить еще больше трафика).

person noctarius    schedule 18.10.2016
comment
Этот. Я пришел сюда, чтобы написать ту же рекомендацию. IExecutorService.submitToKeyOwner(Callable/Runnable, key) можно использовать для простого решения. - person A.K.Desai; 18.10.2016
comment
Спасибо! Это то, что я думаю, может быть лучшим. - person Patrick; 19.10.2016

В Coherence вы можете использовать PartitionedService, чтобы найти связь ключей кэша с элементами кластера. Затем вы можете вызвать процессор ввода с данными для каждого члена, используя PartitionedFilter, чтобы убедиться, что данные отправляются только этому члену. Что-то вроде этого:

// keys in this map are also keys in cache
void processData(Map<String, Data> externalData) { 
    PartitionedService partitionedService = (PartitionedService) cache.getCacheService();
    Map<Member, Map<String, Data>> dataForMembers = splitDataByMembers(partitionedService, externalData);

    for (Entry<Member, Map<String, Data>> dataForMember : dataForMembers.entrySet()) {
        Member member = dataForMember.getKey();
        Map<String, Data> data = dataForMember.getValue();

        PartitionSet partitions = partitionedService.getOwnedPartitions(member);
        PartitionedFilter filter = new PartitionedFilter<>(Filters.always(), partitions);
        EntryProcessor processor = new MyEntryProcessor(data);
        cache.async().invokeAll(filter, processor);
    }
}

Map<Member, Map<String, Data>> splitDataByMembers(
        PartitionedService partitionedService,
        Map<String, Data> externalData) {
    Map<Member, Map<String, Data>> dataForMembers = new HashMap<>();

    for (Object member : partitionedService.getInfo().getServiceMembers()) {
        dataForMembers.put((Member) member, new HashMap<>());
    }
    for (Entry<String, Data> dataForKey : externalData.entrySet()) {
        Member member = partitionedService.getKeyOwner(dataForKey.getKey());
        dataForMembers.get(member).put(dataForKey.getKey(), dataForKey.getValue());
    }
    return dataForMembers;
}

Таким образом, для каждого члена кластера будет только один вызов входного процессора, и каждый член получит только те данные, которые ему интересны.

Я использовал String в качестве ключа кэша и произвольный тип Data для данных, связанных с этим ключом, но вы, конечно, можете использовать любые другие типы (и вам вообще не нужно моделировать внешние данные как карту).

person Krzysztof Kosmatka    schedule 19.10.2016

В Hazelcast вы будете делать executeOnKeys(keys, new EntryProcessor(data)), а это слишком много, так как данные слишком велики.

Почему нет

executeOnKey(key1, new EntryProcessor(data1));
executeOnKey(key2, new EntryProcessor(data2));
executeOnKey(key3, new EntryProcessor(data3));

для отправки подмножества данных, которое необходимо каждому ключу?

person Neil Stevenson    schedule 18.10.2016