Процессор с 2 входящими краями - при возврате false на одном крае продолжайте повторную обработку с того же края и никогда не обрабатывайте новые элементы на другом крае

Я прошу подтверждения моего предположения о логике tryProcess ().

Подробно, как возвращаемое значение (истина / ложь) влияет на рабочий процесс DAG на процессоре с 2 входящими ребрами без указания приоритета.

Мое предположение состоит в том, что если у процессора есть входящие элементы для обоих краев, и один метод tryProcess () возвращает false, другой край будет обработан (если на таком крае доступно больше входящих элементов). Чередование входящих элементов в зависимости от того, какая кромка прекращает обработку, а какая принимает их.

Проблема

Иногда случается, что один экземпляр процессора блокируется на tryProcess (# 0), который всегда возвращает false (потому что мы ожидаем обработки нового элемента с другого края). tryProcess (# 0) вызывается повторно, а tryProcess (# 1) никогда не вызывается. Я уверен, что completeEdge () никогда не вызывается на процессоре ни для края №0, ни для края №1, поэтому я ожидаю, что будет больше элементов для обработки с края №1. Обычно это происходит после многократного выполнения одного и того же задания.

Чтобы лучше объяснить вопрос, это мой вариант использования:

Вариант использования

Моя модель данных состоит из следующего объекта

  • A: объект, идентифицированный атрибутом ida
  • B: объект, идентифицированный атрибутом «idb». В нем есть ссылка на A с использованием значения ida.
  • AB: объект, который связывает объект B и объект A, на который имеется ссылка

Мне нужно сопоставить объекты B с соответствующим объектом A, на который имеется ссылка, и испустить пару из них.

У меня есть DAG с такой настройкой:

Вершины

  • S-A: исходные элементы типа «A» (localParallelism (1), генерирует объекты A, отсортированные по атрибуту «ida»).
  • S-B: исходные элементы типа «B» (localParallelism (1), генерирует объекты B, отсортированные по указанному атрибуту «ida»).
  • C-AB: процессор, который сопоставляет объекты B с объектом A, на который имеется ссылка (излучает объекты AB).

Связи

  • S-A -> C-AB: входящий край # 0 (приоритет не указан, разделен атрибутом "ida")
  • S-B -> C-AB: входящий край # 1 (приоритет не указан, разделен по ссылке на атрибут "ida")

Окружающая среда состоит из кластера струи из орехового литья с 2 узлами.

Логика

Процессор C-AB получает первый объект «A» (с ребра № 0) и сохраняет его до тех пор, пока не будут обработаны все объекты «B», относящиеся к этому объекту «A». Если он получает другой объект «A», он возвращает false в tryProcess (# 0).

Пока он получает объекты «B» (от края №1), которые соответствуют текущему «A», он излучает «AB».

Когда процессор получает объект "B" со ссылкой на следующий объект "A", он отбрасывает текущий объект "A" и ждет следующего.

Если он получает объекты «B» до того, как будет получен объект «A», на который имеется ссылка, дождитесь, пока правильный «A» не будет соответствовать, возвращая false в tryProcess (# 1), если получен новый «B».

Это должно работать, потому что S-A и S-B испускают правильно отсортированные объекты, а края правильно разделены для отправки объектов с одинаковыми значениями ida на один и тот же процессор.


person Lubbo    schedule 12.07.2018    source источник


Ответы (1)


Мое предположение состоит в том, что если у процессора есть входящие элементы для обоих краев, и один метод tryProcess () возвращает false, другое ребро будет обработано (если на таком крае доступно больше входящих элементов).

Это предположение неверно. Поведение процессора эквивалентно

for (Object item : inbox) process(item);

но реализован с совместной многопоточностью, поэтому этот цикл должен иметь возможность «приостанавливать» себя. Мы добиваемся приостановки, позволяя tryProcess() вернуть false.

Механизм выполнения всегда возобновляет работу процессора с того места, где он остановился, и не пытается обрабатывать какие-либо другие элементы до тех пор, пока почтовый ящик не будет очищен. Сам почтовый ящик содержит набор элементов, взятых из входной очереди, а не все элементы, которые край будет передавать в течение всего задания.

Единственный механизм, предлагаемый Jet для решения проблемы взаимозависимости между ребрами, - это приоритет ребра. Если вам нужно что-то более детализированное, ваш процессор должен принять все элементы и буферизовать их внутренне, пока ваше условие выполнения не будет выполнено.

person Marko Topolnik    schedule 12.07.2018
comment
Спасибо, Марко! Затем я бы попытался переместить логику сопоставления элементов в другое место ... возможно, в исходную вершину. Буферизация может быть слишком тяжелой для памяти ... Попробую ... - person Lubbo; 12.07.2018
comment
Я думаю, что это ясно в javadoc tryProcess: может выбрать обработку только частично и вернуть false, и в этом случае он будет вызван снова позже с той же комбинацией (порядковый номер, элемент). (курсив добавлен) - person Oliv; 12.07.2018
comment
@Oliv OTOH нет письменной гарантии, что, например, tryProcess1() не будет вызываться тем временем, после того как tryProcess0() вернет false. - person Marko Topolnik; 12.07.2018