Я ищу альтернативу JoinBlock, с которой можно связать n-TransformBlocks и объединить/объединить сообщения всех исходных блоков TransformBlock вместе, чтобы передать их коллекцию в другой блок потока данных.
JoinBlock отлично справляется со своей задачей, но он ограничен подключением до 3 исходных блоков. Он также страдает от множества недостатков (очень медленное соединение четных типов значений (целых) двух исходных блоков). Есть ли способ вернуть задачи из TransformBlocks и подождать, пока все TransformBlocks не получат завершенную задачу для передачи, прежде чем принимать Task<item>
?
Есть альтернативные идеи? У меня потенциально есть 1-20 таких блоков преобразования, элементы которых мне нужно объединить, прежде чем передавать объединенную коллекцию элементов. Каждый блок преобразования гарантированно возвращает ровно один элемент вывода для каждого «преобразованного» элемента ввода.
Изменить: Запрошено разъяснение:
В соответствии с одним из моих предыдущих вопросов я настроил свои JoinBlocks следующим образом:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target1);
transformBlock2.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
Task<item>
?» Я не уверен, почему вы думаете, что это поможет, но блоки TDF так не работают. Вы либо принимаете предмет, либо нет, вы не можете взять предмет и решить принять его через какое-то время. - person svick   schedule 02.12.2012