Объединение задач в цепочку с продолжением и последующим запуском параллельной задачи

Рабочий процесс параллельных задач

http://i.stack.imgur.com/luRnu.png

Я надеюсь получить помощь по проблеме, с которой я столкнулся. Итак, проблема в том, что я запускаю параллельные задачи для поиска файлов в папках. Каждая задача включает в себя идентификацию файлов и добавление их в массив файлов. Затем дождитесь завершения каждой задачи, чтобы файлы были собраны, а затем выполните сортировку результатов. Затем обработайте отсортированный файл независимо, запустив по одной задаче для каждого файла, чтобы прочитать его и получить обратно соответствующий шаблон. Заключительный этап - объединить все результаты в удобочитаемом формате и отобразить их в удобном для пользователя виде.

Итак, вопрос в том, что я хочу правильно связать задачи, чтобы не блокировать поток пользовательского интерфейса. Я хотел бы иметь возможность отменить все на любом этапе программы.

Подвести итог:

Этап 1: Найдите файлы с помощью поиска в папках. Каждая задача выполняет рекурсивный поиск в дереве папок.

Этап 2: отсортируйте все найденные файлы и удалите дубликаты.

Этап 3: Запустите новые задачи для независимой обработки файлов. Каждая задача открывает файл и ищет соответствующий шаблон.

Этап 4: объединить результаты поиска по каждому файлу в один гигантский набор результатов и сделать его удобным для чтения человеком.

     List<Task> myTasks = new List<Task>();

// ==== stage 1 ======
        for(int i = 0; i < 10; i++) {
           string directoryName = directories[i];

           Task t = new Task(() =>
           {
              FindFiles(directoryName);
           });

           myTasks.Add(t);
            t.Start();
        }

// ==== stage 2 ====
        Task sortTask = Task.Factory.ContinueWhenAll(myTasks.ToArray(), (t) =>
        {
           if(_fileResults.Count > 1) {
              // sort the files and remove any duplicates
           }
        });

        sortTask.Wait();

// ==== stage 3 ====
        Task tt = new Task(() =>
        {
             Parallel.For(0, _fileResults.Count, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = token, TaskScheduler = _taskScheduler },
                    (i, loopstate) => {
              // 1. open file
              // 2. read file
              // 3. read file line by line
              }
        }

// == stage 4 === 
        tt.ContinueWith((t) =>
        {
           // 1. aggregate the file results into one giant result set
           // 2. display the giant result set in human readable format
        }, token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.FromCurrentSynchronizationContext());

      tt.start();

comment
Вы никогда не должны использовать конструктор Task. Если вы хотите создать задачу, представляющую выполнение делегата, используйте Task.Run для создания и запуска такой задачи.   -  person Servy    schedule 05.01.2016
comment
Вы не должны заставлять каждую из этих задач изменять List, чтобы добавить свои результаты. List не является потокобезопасным, поэтому это не обязательно сработает. Каждая задача должна возвращать значение, которое она создает при вычислении своего результата, и иметь доступ к продолжению (ям), полученным в результате, через Task API.   -  person Servy    schedule 05.01.2016
comment
Я использую блокировку и блокировку, чтобы заблокировать общий объект. Общий объект, являющийся набором результатов.   -  person P0ppy    schedule 05.01.2016
comment
Нет, в опубликованном вами коде вас нет. И даже в этом случае вы не должны этого делать. Вы должны установить для каждой задачи значение результата, которое она вычисляет. Он представляет семантику намного лучше, он позволяет модулировать и эффективно анализировать код изолированно, а также он будет быстрее, поскольку вы не вводите зависимости между каждой из задач без необходимости.   -  person Servy    schedule 05.01.2016
comment
Я согласен с твоей точкой зрения. Есть шанс, что я изменю его соответствующим образом, как только выясню, как связать задачи вместе. Основная проблема, с которой я столкнулся, - это выяснить, как правильно связать задачи вместе. Вы видели мою блок-схему - i.stack.imgur.com/luRnu.png   -  person P0ppy    schedule 05.01.2016


Ответы (2)


Не ждите синхронно завершения какой-либо задачи. Если какая-либо из этих операций должна выполняться после ранее созданной задачи, вместо этого добавьте эту работу в качестве продолжения этой задачи.

person Servy    schedule 05.01.2016
comment
Я понял, что. Но цель состоит в том, чтобы все задачи завершили поиск, чтобы я мог отсортировать окончательный набор результатов. - person P0ppy; 05.01.2016
comment
@EricLiu И способ сделать это - выполнить любую нужную работу, используя эти значения в качестве продолжения задач, создающих промежуточные значения, а не синхронно блокируя поток пользовательского интерфейса. - person Servy; 05.01.2016
comment
Не могли бы вы уточнить на небольшом примере. Очень признателен. - person P0ppy; 05.01.2016
comment
@EricLiu Вы уже показали, что знаете, как добавить продолжение к задаче в коде OP. Просто сделайте это для всех 3-х операций продолжения, которые у вас есть, вместо того, чтобы делать это для 2-х из 3-х и синхронно блокировать одну из них. Если вы хотите увидеть пример, просто посмотрите на свой собственный код для последнего этапа. - person Servy; 05.01.2016
comment
Что-то вроде этого ? var a = new Task (() = ›System.Diagnostics.Debug.Print (start)). ContinueWith ((t) =› System.Diagnostics.Debug.Print (t.Id.ToString () + completed)). ContinueWith ((t) = ›System.Diagnostics.Debug.Print (t.Id.ToString () + завершено); - person P0ppy; 05.01.2016
comment
@EricLiu Это один из способов добавить продолжение, да. - person Servy; 05.01.2016

Думали ли вы об использовании функции async/await? Судя по вашему вопросу, она идеально подходит для ваших нужд. Вот быстрая попытка решить вашу проблему с его помощью:

try
{
    List<Task<File[]>> stage1Tasks = new List<Task<File[]>>();

    // ==== stage 1 ======
    for (int i = 0; i < 10; i++)
    {
        string directoryName = directories[i];

        Task<File[]> t = Task.Run(() =>
        {
            return FindFiles(directoryName);
        },
            token);

        stage1Tasks.Add(t);
    }

    File[][] files = await Task.WhenAll(stage1Tasks).ConfigureAwait(false);

    // Flatten files.
    File[] _fileResults = files.SelectMany(x => x).ToArray();

    // ==== stage 2 ====
    Task<File[]> sortFilesTask = Task.Run(() =>
    {
        if (_fileResults.Count > 1)
        {
            // sort the files and remove any duplicates
            return _fileResults.Reverse().ToArray();
        }
    },
        token);

    File[] _sortedFileResults = await sortFilesTask.ConfigureAwait(false);

    // ==== stage 3 ====
    Task<SomeResult[]> tt = Task.Run(() =>
    {
        SomeResult[] results = new SomeResult[_sortedFileResults.Length];
        Parallel.ForEach(_sortedFileResults,
            new ParallelOptions {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                CancellationToken = token,
                TaskScheduler = _taskScheduler
            },
            (i, loopstate) =>
            {
                // 1. open file
                // 2. read file
                // 3. read file line by line
                results[i] = new SomeResult( /* here goes your results for each file */);
            });
        return results;
    },
        token);

    SomeResult[] theResults = await tt.ConfigureAwait(false);


    // == stage 4 === 
    // 1. aggregate the file results into one giant result set
    // 2. display the giant result set in human readable format
    // ....

}
catch (TaskCanceledException)
{
    // some task has been cancelled...
}
person jamespconnor    schedule 07.01.2016
comment
Я также хотел бы иметь возможность использовать cancellationToken для выхода из программы на любом этапе. Время от времени между задачами я хочу вернуться к потоку пользовательского интерфейса, чтобы обновить ход выполнения. - person P0ppy; 07.01.2016
comment
Тем не менее, я собираюсь попробовать ваш код. Я не знаком с async, это кажется многообещающим. Если я застряну, я дам вам знать. Спасибо, Джеймс. - person P0ppy; 07.01.2016
comment
моя переменная (определенная вне поля зрения в моем примере) с именем token - это CancellationToken, которую вы можете использовать для отмены любой из задач, которые вы видите. После этого будет запущен блок catch. - person jamespconnor; 08.01.2016