Поток данных TPL, альтернатива ограничениям JoinBlock?

Я ищу альтернативу JoinBlock, с которой можно связать n-TransformBlocks и объединить/объединить сообщения всех исходных блоков TransformBlock вместе, чтобы передать их коллекцию в другой блок потока данных.

JoinBlock отлично справляется со своей задачей, но он ограничен подключением до 3 исходных блоков. Он также страдает от множества недостатков (очень медленное соединение четных типов значений (целых) двух исходных блоков). Есть ли способ вернуть задачи из TransformBlocks и подождать, пока все TransformBlocks не получат завершенную задачу для передачи, прежде чем принимать Task<item>?

Есть альтернативные идеи? У меня потенциально есть 1-20 таких блоков преобразования, элементы которых мне нужно объединить, прежде чем передавать объединенную коллекцию элементов. Каждый блок преобразования гарантированно возвращает ровно один элемент вывода для каждого «преобразованного» элемента ввода.

Изменить: Запрошено разъяснение:

В соответствии с одним из моих предыдущих вопросов я настроил свои JoinBlocks следующим образом:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking
    broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
    broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
    transformBlock1.LinkTo(joinBlock.Target1);
    transformBlock2.LinkTo(joinBlock.Target2);
    joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
    Stopwatch watch = new Stopwatch();
    watch.Start();

    const int numElements = 1000000;

    for (int i = 1; i <= numElements; i++)
    {
        broadCastBlock.Post(i);
    }

    ////mark completion
    broadCastBlock.Complete();
    Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


    processorBlock.Completion.Wait();

    watch.Stop();

    Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
    Console.ReadLine();
}

person Matt    schedule 02.12.2012    source источник
comment
«Есть ли способ вернуть задачи из блоков TransformBlocks и подождать, пока все блоки TransformBlock не получат завершенную задачу для передачи, прежде чем принимать Task<item>?» Я не уверен, почему вы думаете, что это поможет, но блоки TDF так не работают. Вы либо принимаете предмет, либо нет, вы не можете взять предмет и решить принять его через какое-то время.   -  person svick    schedule 02.12.2012


Ответы (2)


Один из способов сделать это — использовать BatchBlock с Greedy, установленным на false. В этой конфигурации блок ничего не делает, пока не будет n элементов из n разных блоков, ожидающих его использования (где n — это число, которое вы установили при создании BatchBlock). Когда это происходит, он потребляет все n элементов одновременно и создает массив, содержащий все элементы.

Одним из предостережений в этом решении является то, что результирующий массив не отсортирован: вы не узнаете, какой элемент из какого источника. И я понятия не имею, как его производительность сравнивается с JoinBlock, вам придется проверить это самостоятельно. (Хотя я бы понял, если бы использование BatchBlock таким образом было медленнее из-за накладных расходов, необходимых для нежадного потребления.)

person svick    schedule 02.12.2012
comment
Я протестирую его и сообщу о производительности относительно блока соединения. Однако меня также интересуют альтернативные решения, отличные от TDF. - person Matt; 02.12.2012
comment
некоторые отзывы о BatchBlock: вы были правы, пакетный блок в нежадном режиме (требуется для получения всех элементов, а не только первого доступного) значительно замедляет работу. Я решил решить эту проблему вне TDF. Я все еще хочу запускать функцию в каждом предыдущем блоке преобразования параллельно, а затем объединять результаты обычным способом, а не через TDF. Но спасибо за указатели на batchBlock. - person Matt; 03.12.2012
comment
@svick - Хорошее перепрофилирование BatchBlock, чтобы компенсировать ограниченное количество выходных кортежей для JoinBlock. - person Robert Oschler; 08.04.2013

Если вы хотите выполнить несколько параллельных операций для каждого элемента, IMHO имеет смысл выполнять эти операции внутри одного блока, вместо того, чтобы разбивать их на несколько блоков, а затем снова пытаться объединить независимые результаты в один объект. Итак, мое предложение состоит в том, чтобы сделать что-то вроде этого:

var block = new TransformBlock<MyClass, MyClass>(async item =>
{
    Task<SomeType1> task1 = Task.Run(() => CalculateProperty1(item.Id));
    Task<SomeType2> task2 = Task.Run(() => CalculateProperty2(item.Id));
    await Task.WhenAll(task1, task2).ConfigureAwait(false);
    item.Property1 = task1.Result;
    item.Property2 = task2.Result;
    return item;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2
});

В приведенном выше примере элементы типа MyClass передаются через TransformBlock. Свойства Property1 и Property2 каждого элемента вычисляются параллельно с использованием отдельного Task для каждого свойства. Затем ожидаются обе задачи, и когда обе задачи выполнены, результаты присваиваются свойствам элемента. Наконец, обработанный элемент возвращается.

Единственное, что вы хотите знать при таком подходе, это то, что степень параллелизма будет продуктом внутренних параллельных операций и MaxDegreeOfParallelism параметра блока. Таким образом, в приведенном выше примере степень параллелизма будет 2 x 2 = 4. Точнее, это будет максимальная степень параллелизма, поскольку возможно, что одно из двух внутренних вычислений будет медленнее другого. Таким образом, в любой момент фактическая степень параллелизма может быть между 2 и 4.

person Theodor Zoulias    schedule 11.06.2020