Обработка плоского файла по частям с использованием нескольких потоков с использованием шаблона производитель/потребитель и SqlBulkCopy в БД SQL Server

Я надеюсь, что вы будете терпеть меня. Я хотел предоставить как можно больше информации. Основная проблема заключается в том, как создать структуру (например, стек), которая будет использоваться несколькими потоками, которые будут извлекать значение и использовать его для обработки одного большого плоского файла и, возможно, выполнять цикл снова и снова, пока весь файл не будет обработан. Если файл содержит 100 000 записей, которые могут быть обработаны 5 потоками с использованием 2 000 фрагментов строк, то каждый поток получит для обработки 10 фрагментов.

Моя цель - переместить данные в плоский файл (с заголовком... Подзаголовок... Деталь, Деталь, Деталь,... Деталь, Поднижний колонтитул, Подзаголовок... Деталь, Деталь, Деталь,... Деталь, Поднижний колонтитул, Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Footer) в базу данных OLTP с режимом восстановления до Simple (возможно, Full) в 3 таблицы: 1-я представляет уникальный ключ подзаголовка, присутствующий в строке подзаголовка, 2-я промежуточная таблица SubheaderGroup, представляющая группировку строк сведений в кусках по 2000 записей (должен иметь PK идентификатора подзаголовка в качестве его FK и 3-й, представляющий строки сведений с FK, указывающим на PK подзаголовка.

Я занимаюсь ручным управлением транзакциями, так как у меня могут быть десятки тысяч строк сведений, и я использую специальное поле, для которого установлено значение 0 в таблицах назначения во время загрузки, а затем в конце обработки файла я выполняю транзакционное обновление, изменяя это значение 1, что может сигнализировать другому приложению о завершении загрузки.

Я хочу разбить этот плоский файл на несколько равных частей (одинаковое количество строк), которые можно обрабатывать несколькими потоками и импортировать с помощью SqlBulkCopy с использованием IDataReader, созданного из метаданных таблицы назначения).

Я хочу использовать шаблон производителя/потребителя (как объяснено в приведенной ниже ссылке - анализ PDF и пример кода), чтобы использовать SqlBulkCopy с параметром SqlBulkCopyOptions.TableLock. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Этот шаблон позволяет создавать несколько производителей, и эквивалентное количество потребителей должно подписаться на производителей для использования строки.

В проекте TestSqlBulkCopy, в файле DataProducer.cs, есть метод, имитирующий создание тысяч записей.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Этот метод будет выполняться в контексте нового потока. Я хочу, чтобы этот новый поток считывал только уникальный фрагмент исходного плоского файла, а другой поток начинал обработку следующего фрагмента. Затем потребители будут перемещать данные (которые перекачиваются к ним) в базу данных SQL Server с помощью класса SqlBulkCopy ADO.NET.

Итак, вопрос здесь заключается в том, что основная программа диктует, какая строка от строки до строки должна обрабатываться каждым потоком, и я думаю, что это должно происходить во время создания потока. Второе решение, вероятно, заключается в том, чтобы потоки разделяли некоторую структуру и использовали что-то уникальное для них (например, номер потока или порядковый номер) для поиска общей структуры (возможно, стека и извлечения значения (блокируя стек при этом), а затем следующий поток будет затем выберите следующее значение.Основная программа выберет плоский файл, определит размер фрагментов и создаст стек.

Итак, может ли кто-нибудь предоставить некоторые фрагменты кода, псевдокод того, как несколько потоков будут обрабатывать один файл и получать только уникальную часть этого файла?

Спасибо, Рэд


person Rad    schedule 14.01.2010    source источник
comment
Требуется много усилий, чтобы дублировать то, что SSIS делает из коробки.   -  person Remus Rusanu    schedule 14.01.2010


Ответы (1)


Что хорошо сработало для меня, так это использование очереди для хранения необработанной работы и словаря для отслеживания работы в процессе:

  1. Создайте рабочий класс, который принимает имя файла, начальную строку и количество строк и имеет метод обновления, выполняющий вставки в базу данных. Передайте метод обратного вызова, который рабочий использует, чтобы сигнализировать о завершении.
  2. Загрузите очередь с экземплярами рабочего класса, по одному для каждого фрагмента.
  3. Создайте поток диспетчера, который удаляет экземпляр рабочего процесса из очереди, запускает его метод обновления и добавляет экземпляр рабочего процесса в Dictionary с ключом ManagedThreadId своего потока. Делайте это до тех пор, пока не будет достигнуто максимально допустимое количество потоков, как указано в Dictionary.Count. Диспетчер ждет, пока завершится поток, а затем запускает другой. Есть несколько способов подождать.
  4. По завершении каждого потока его обратный вызов удаляет его ManagedThreadId из словаря. Если поток завершается из-за ошибки (например, превышение времени ожидания соединения), то обратный вызов может повторно вставить работника в очередь. Это хорошее место для обновления пользовательского интерфейса.
  5. Ваш пользовательский интерфейс может отображать активные потоки, общий прогресс и время для каждого фрагмента. Это может позволить пользователю настроить количество активных потоков, приостановить обработку, показать ошибки или остановить раньше.
  6. Когда очередь и словарь пусты, все готово.

Демонстрационный код как консольное приложение:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
person Ed Power    schedule 14.01.2010
comment
epower, Спасибо за подробный псевдокод. Не могли бы вы указать мне код, который делает что-то подобное. Где останется определение метода обратного вызова и ссылка на Dictionary и Queue? Я полагаю, что обратный вызов будет методом в основном потоке. Основной поток создает экземпляры Workers, Queue, порождает поток Dispatcher, которому, как я полагаю, передается Queue (созданная в классе основного потока) в качестве аргумента в его конструкторе. Я считаю, что Dictionary также создается в классе основного потока и также передается Dispatcher в его конструкторе. - person Rad; 15.01.2010
comment
Я предполагаю, что потребуется заблокировать Queue и Dictionary, потому что к ним обращается основной поток (когда вызывается обратный вызов) и Dispatcher. Я просто не знаю, как это работает: диспетчер ждет, пока поток завершится, а затем запускает другой, если обратный вызов находится в основном потоке. Как диспетчер узнает, что происходит в методе обратного вызова основного потока. Будет ли он объединять Dictionary для пустого места и понимать, что может удалить новое значение из очереди. Не могли бы вы предоставить некоторые детали реализации. Большое спасибо, Рэд - person Rad; 15.01.2010
comment
Я добавил пример приложения, чтобы продемонстрировать это. Вы правы в том, что вам нужно заблокировать коллекции. Код должен быть простым и функциональным, а не последним словом в многопоточном дизайне. Надеюсь, другие присоединятся к некоторым украшениям и [мягкой] критике. - person Ed Power; 16.01.2010
comment
Это отличный пример кода. Спасибо. Я заметил, что переменная activeThreadCount не используется. Я предполагаю, что это может быть изменено между моментом его установки и тем, когда нам снова нужно прочитать activeWork.Count в строке: if (activeWork.Count == maxThreads) break; . Thread.Sleep(200); line единственная незавершенная строка кода? Я пытаюсь понять, что делать в этой строке. - person Rad; 16.01.2010
comment
Нужно ли мне расширять область действия и сохранять коллекцию рабочих экземпляров (они привязаны к внутреннему циклу while), чтобы проверить, завершены ли какие-либо рабочие потоки. Нужно ли мне что-то добавлять (например, ManualResetEvent через ObjectState) в метод WorkerFinished, как это сделано здесь: http://74.125.95.132/search?q=cache:http://bytes.com/topic/c-sharp/answers/226317-working-example-waithandle-waitany-please чтобы сигнализировать основному потоку о продолжении удаления рабочих из очереди и создании новых потоков (до maxThreads) - person Rad; 16.01.2010
comment
@Rad - Хороший улов неиспользуемого activeThreadCount - я пропустил это, когда рефакторил пример. Thread.Sleep(200) не является незаконченным, это всего лишь один из способов для Dispatch приостановить выполнение некоторых потоков. Метод WorkerFinished удалит поток из коллекции activeWorkder, поэтому вам не нужно проверять его в другом месте. Вы можете использовать события сброса в WorkerFinished для замены Thread.Sleep; Я просто выбрал более простой путь, но он позволит одновременно завершить множество потоков, прежде чем Dispatch запустит больше. - person Ed Power; 17.01.2010
comment
Вы должны попробовать оба подхода и посмотреть, что работает лучше всего. Для начала измерьте такие параметры, как количество потоков в секунду и среднюю продолжительность потока. Вы также можете поместить LoadQueue в отдельный поток (с блокировкой), если хотите начать обработку воркеров во время заполнения очереди. - person Ed Power; 17.01.2010
comment
@ebpower. У вас есть потоки POWER. Спасибо. Я бы увеличил ваш голос, но мне не хватило очков. Извините, что беспокою вас, но я никогда не слышал о методе WorkerFinished. Не могли бы вы (всякий раз, когда вы найдете время — я просто слишком новичок в многопоточности, что у меня кружится голова), если бы вы отредактировали приведенный выше код, чтобы добавить свою идею вместе с LoadQueue, вызываемым в отдельном потоке, включая мои наблюдения о правильном использовании переменной activeThreadCount - person Rad; 23.01.2010
comment
(Я не знаю, как гарантировать, что он останется прежним до конца метода) и расширение области действия коллекции экземпляра рабочего процесса и как правильно использовать ручной сброс. У меня есть куча файлов, которые копируются в папку, и если я начну загружать очередь с новыми рабочими потоками, я могу преждевременно завершить программу. Моя консольная программа запускается извне, и она должна быть активна в течение 15 минут, и она должна обрабатывать столько файлов, сколько они прибывают. - person Rad; 23.01.2010
comment
Таким образом, какое бы количество файлов оно ни собрало за эти 15 минут, оно должно быть завершено этим экземпляром консольного приложения. Если встречается большой файл, он может работать намного дольше (даже в часах). Я хочу знать, как справиться с ситуацией, когда внешняя программа планирования снова запускает мое консольное приложение, как захватить только новые файлы, не поставленные в очередь первым экземпляром. Могу ли я каким-то образом запустить это приложение в цикле навсегда (как служба, я полагаю), или был бы способ, которым 2 экземпляра моих консольных приложений могли бы взаимодействовать с использованием методов внутри процесса для - person Rad; 23.01.2010
comment
выяснить, что будет обрабатывать первый экземпляр, чтобы второй экземпляр получал только необработанные файлы (которые первый не забрал в течение 15 минут). У меня есть последний вопрос: есть ли способ, которым после завершения обработки нескольких потоков и их объединения в конец основным потоком, который он все еще может выбрать в каждом состоянии потоков и обнаружить, что они собрали (я считаю, что это называется состоянием внутреннего потока). Я хочу использовать около 4-8 потоков для обработки большого файла со 100 000 строк и использовать SqlBulkCopy. - person Rad; 23.01.2010
comment
InputFile имеет разные типы строк (1), представляющие заголовок файла (идентификатор клиента), (3) начало пакета (уникальный идентификатор заказа), (5) сведения о заказе, (7) конец пакета (который содержит количество строк в пакете) и ( 9) является и файла. Единственная связь между и порядком и деталями заключается в том, что строки деталей следуют за (3) типом записи, и они зажаты между ней и (7) записью. Я хочу запустить каждый поток, который будет считывать X строк для обработки (скажем, 2000). Поток может выбрать попытку выйти за пределы предела +/- 10%, чтобы найти последнюю партию в целом (03), (5)… (5), (7) - person Rad; 23.01.2010
comment
а затем используйте шаблон Publisher/Subscriber для вставки родителя/детали в промежуточные таблицы (мне, вероятно, придется добавить столбец (OrderID к деталям, чтобы установить связь между родителем/потомком). StartOfFile_1 --- FirstBatch --- 3,5, 5,5,5, 3,5,5, 5,5, 3, 5,5, 5,7, 3,5,7, --- Вторая партия --- 3,5,5, 5,5, 5 ,5, 5,5, 5,5, 5,5, 5,5, 5,5, 5,5,5 5,5, 5,5,----Третья партия----- 5,5, 5,5, 5,5,7, 3,5,5, 5,5, 5,5, 5,5, 5,5, 5,5, 5,5,7 ---- 9_EndOfFile Итак, моя проблема здесь заключается в том, что ThirdBatch не будет знать, где начался предыдущий, а SecondBatch может начать с типа строки, отличного от (3). - person Rad; 23.01.2010
comment
Таким образом, потоки каким-то образом должны взаимодействовать, чтобы обработать все строки в файле. Спасибо - person Rad; 23.01.2010