Сделайте шаг в архитектуре программы и узнайте, как найти практическое решение реальной бизнес-проблемы с помощью NodeJS Streams из этой статьи.

В обход: механика жидкости

Одна из самых сильных сторон программного обеспечения заключается в том, что мы можем разрабатывать абстракции, которые позволяют нам рассуждать о коде и манипулировать данными понятными нам способами. Потоки - один из таких классов абстракции.

В простой механике жидкости концепция линии тока полезна для рассуждений о том, как будут двигаться частицы жидкости, и об ограничениях, применяемых к ним в различных точках системы.

Например, предположим, что вода равномерно течет по трубе. На полпути к трубе он разветвляется. Как правило, поток воды равномерно распределяется по каждой ветви. Инженеры используют абстрактную концепцию линии обтекания, чтобы рассуждать о свойствах воды, таких как скорость потока, для любого количества ответвлений или сложных конфигураций трубопроводов. Если вы спросите инженера, какой, по его предположениям, будет скорость потока через каждую ветвь, он правильно ответит «половина», интуитивно. Математически это расширяется до произвольного числа линий тока.

Концептуально потоки должны кодировать то, что является слишком плавной механикой. Мы можем рассуждать о данных в любой момент, рассматривая их как часть потока. Вместо того, чтобы беспокоиться о деталях реализации между тем, как они хранятся. Возможно, вы могли бы обобщить это до некоторой универсальной концепции конвейера, который мы можем использовать между дисциплинами. На ум приходит воронка продаж, но это не так, и мы рассмотрим ее позже. Лучший пример потоков, с которым вы обязательно должны ознакомиться, если у вас еще нет каналов UNIX:

cat server.log | grep 400 | less

Мы ласково называем символ | трубкой. В зависимости от его функции мы объединяем выходные данные одной программы в качестве входных данных другой программы. Эффективная настройка конвейера.

(Кроме того, это похоже на трубу.)

Если вы похожи на меня и задаетесь вопросом, почему это необходимо, спросите себя, почему мы используем конвейеры в реальной жизни. По сути, это структура, исключающая хранение между точками обработки. Нам не нужно беспокоиться о хранении баррелей нефти, если она перекачивается.

Поймите это в программном обеспечении. Умные разработчики и инженеры, написавшие код для конвейерной передачи данных, настроили его так, чтобы он никогда не занимал слишком много памяти на машине. Независимо от того, насколько велик файл журнала выше, он не повесит терминал. Вся программа представляет собой процесс, обрабатывающий бесконечно малые точки данных в потоке, а не их контейнеры. Файл журнала никогда не загружается в память сразу, а загружается в управляемые части.

Я не хочу изобретать велосипед здесь. Итак, теперь, когда я рассмотрел метафору потоков и обоснование их использования, у Flavio Copes есть отличная запись в блоге, в которой рассказывается, как они реализованы в Node. Уделите столько времени, сколько вам нужно, чтобы охватить там основы, а когда будете готовы, возвращайтесь, и мы рассмотрим вариант использования.

Ситуация

Итак, теперь, когда у вас есть этот инструмент в вашем арсенале инструментов, представьте себе это:

Вы находитесь на работе, и ваш менеджер / юрист / отдел кадров / ваш клиент / (вставьте здесь заинтересованное лицо) обратился к вам с проблемой. Они слишком долго просматривают структурированные PDF-файлы. Конечно, обычно люди не говорят вам ничего подобного. Вы услышите: «Я трачу 4 часа на ввод данных». Или «Я просматриваю таблицы цен». Или: «Я заполняю правильные формы, чтобы мы получали карандаши с фирменной символикой нашей компании каждый квартал».

Как бы то ни было, если их работа включает и (а) чтение структурированных документов PDF, и (б) массовое использование этой структурированной информации. Затем вы можете вмешаться и сказать: «Эй, мы могли бы автоматизировать это и освободить ваше время для работы над другими делами».

Итак, для этой статьи давайте придумаем фиктивную компанию. Там, где я родом, термин «пустышка» относится либо к идиоту, либо к детской соске. Итак, давайте представим себе эту фальшивую компанию, производящую пустышки. Пока мы делаем это, давайте прыгнем на акулу и скажем, что они напечатаны на 3D-принтере. Компания действует как этичный поставщик пустышек для нуждающихся, которые не могут себе позволить покупать товары премиум-класса.

(Я знаю, как это глупо, пожалуйста, прекратите свое недоверие.)

Тодд закупает материалы для печати, которые входят в продукцию DummEth, и должен убедиться, что они соответствуют трем ключевым критериям:

  • они сделаны из пищевого пластика, чтобы сохранить здоровье младенцев,
  • они дешевые, для экономичного производства и
  • они подобраны как можно ближе, чтобы поддержать маркетинговую копию компании, в которой говорится, что их цепочка поставок также этична и загрязняет как можно меньше.

Проэкт

Так что мне легче следовать, я создал репозиторий GitLab, который вы можете клонировать и использовать. Убедитесь, что ваши установки Node и NPM также обновлены.

Базовая архитектура: ограничения

Итак, что мы пытаемся сделать? Предположим, что Тодд хорошо работает с электронными таблицами, как и многие офисные работники. Чтобы Тодд мог отделить пресловутую пшеницу для 3D-печати от плевел, ему было проще определять материалы по пищевой ценности, цене за килограмм и местоположению. Пришло время установить некоторые ограничения проекта.

Предположим, пищевой сорт материала оценивается по шкале от нуля до трех. Без смысла запрещенный в Калифорнии пластик с высоким содержанием бисфенола А. Три означает обычно используемые незагрязняющие материалы, такие как полиэтилен низкой плотности. Это сделано исключительно для упрощения нашего кода. На самом деле нам нужно было бы каким-то образом сопоставить текстовые описания этих материалов (например, «LDPE») с пищевой категорией.

Можно предположить, что цена за килограмм является свойством материала, указанным его производителем.

Местоположение, мы собираемся упростить и принять его за простое относительное расстояние по прямой. На противоположном конце спектра находится чрезмерно спроектированное решение: использование некоторого API (например, Google Maps) для определения приблизительного расстояния, на которое может пройти данный материал, чтобы добраться до распределительных центров Тодда. В любом случае, допустим, нам дается значение (километров до Тодда) в PDF-файлах Тодда.

Кроме того, давайте рассмотрим контекст, в котором мы работаем. Тодд эффективно действует как сборщик информации на динамичном рынке. Товары приходят и уходят, и их детали могут меняться. Это означает, что у нас есть произвольное количество PDF-файлов, которые можно изменить - или, что более важно, обновить - в любое время.

Итак, основываясь на этих ограничениях, мы, наконец, можем понять, чего мы хотим, чтобы наш код выполнял. Если вы хотите проверить свои дизайнерские способности, остановитесь здесь и подумайте, как вы структурируете свое решение. Это может выглядеть не так, как то, что я собираюсь описать. Это нормально, если вы предоставляете Тодду разумное работоспособное решение и то, что вы не стали бы рвать на волосах позже, пытаясь сохранить.

Базовая архитектура: решения

Итак, у нас есть произвольное количество PDF-файлов и некоторые правила их анализа. Вот как мы можем это сделать:

  1. Настройте объект Stream, который может читать с некоторого ввода. Как HTTP-клиент, запрашивающий загрузку PDF. Или написанный нами модуль, который считывает файлы PDF из каталога в файловой системе.
  2. Настроить посредника Буфер. Это похоже на то, как официант в ресторане доставляет готовое блюдо предполагаемому покупателю. Каждый раз, когда полный PDF-файл передается в поток, мы сбрасываем эти фрагменты в буфер, чтобы его можно было транспортировать.
  3. Официант (Buffer) доставляет еду (данные PDF) покупателю (наша функция синтаксического анализа). Клиент делает с ним то, что ему заблагорассудится (конвертирует в какой-либо формат электронной таблицы).
  4. Когда заказчик (Parser) закончит, сообщите официанту (Buffer), что он свободен и может работать с новыми заказами (PDF).

Вы заметите, что этому процессу нет однозначного конца. Как ресторан, наша комбинация Stream-Buffer-Parser никогда не завершается, пока, конечно, не перестанут поступать данные - не больше заказов.

Теперь я знаю, что кода еще нет. Это очень важно. Важно иметь возможность рассуждать о наших системах, прежде чем писать их. Теперь мы не получим все правильно с первого раза даже с априорными рассуждениями. В дикой природе всегда что-то ломается. Ошибки нужно исправить.

Тем не менее, планирование кода перед его написанием - мощное упражнение в сдержанности и дальновидности. Если вы сможете упростить системы возрастающей сложности до управляемых частей и аналогий, вы сможете экспоненциально повысить свою продуктивность, поскольку когнитивный стресс от этих сложностей превращается в хорошо продуманные абстракции.

В общем, это выглядит примерно так:

Введение в зависимости

Теперь в качестве отказа от ответственности я должен добавить, что существует целый мир размышлений о введении зависимостей в ваш код. Я хотел бы осветить эту концепцию в другом посте. А пока позвольте мне просто сказать, что один из фундаментальных конфликтов - это конфликт между нашим желанием быстро выполнить свою работу (то есть: избежать синдрома NIH) и нашим желанием избежать стороннего риска.

Применяя это к нашему проекту, я решил переложить основную часть обработки PDF-файлов в модуль pdfreader. Вот несколько причин, почему:

  • Он был опубликован недавно, что является хорошим признаком актуальности репо.
  • У него есть одна зависимость, то есть это просто абстракция над другим модулем, которая регулярно поддерживается на GitHub. Уже одно это - отличный знак. Более того, у зависимости, модуля под названием pdf2json, есть сотни звезд, 22 участника и множество наблюдателей, пристально наблюдающих за ней.
  • Сопровождающий, Адриан Джоли, хорошо ведет учет в системе отслеживания проблем GitHub и активно отвечает на вопросы пользователей и разработчиков.
  • При аудите через NPM (6.4.1) уязвимостей не обнаружено.

В общем, включение этой зависимости кажется безопасным.

Теперь модуль работает довольно просто, хотя его README явно не описывает структуру его вывода. На скале отмечается:

  1. Он предоставляет класс PdfReader для создания экземпляра
  2. В этом экземпляре есть два метода анализа PDF-файла. Они возвращают одинаковый вывод и отличаются только вводом: PdfReader.parseFileItems для имени файла и PdfReader.parseBuffer от данных, на которые мы не хотим ссылаться из файловой системы.
  3. Методы запрашивают обратный вызов, который вызывается каждый раз, когда PdfReader находит то, что он обозначает как элемент PDF. Есть три вида. Во-первых, метаданные файла, которые всегда являются первым элементом. Во-вторых, это метаданные страницы. Он действует как возврат каретки для координат обрабатываемых текстовых элементов. И, наконец, текстовые элементы, которые мы можем рассматривать как простые объекты / структуры со свойством текста и 2D-координатами AABB с плавающей запятой на странице.
  4. Задача нашего обратного вызова - обработать эти элементы в структуре данных по нашему выбору, а также обработать любые возникшие в ней ошибки.

Вот фрагмент кода в качестве примера:

const { PdfReader } = require('pdfreader');
// Initialise the reader
const reader = new PdfReader();
// Read some arbitrarily defined buffer
reader.parseBuffer(buffer, (err, item) => {
  if (err)
    console.error(err);
  else if (!item)
    /* pdfreader queues up the items in the PDF and passes them to
     * the callback. When no item is passed, it's indicating that
     * we're done reading the PDF. */
    console.log('Done.');
  else if (item.file)
    // File items only reference the PDF's file path.
    console.log(`Parsing ${item.file && item.file.path || 'a buffer'}`)
  else if (item.page)
    // Page items simply contain their page number.
    console.log(`Reached page ${item.page}`);
  else if (item.text) {
    // Text items have a few more properties:
    const itemAsString = [
      item.text,
      'x: ' + item.x,
      'y: ' + item.y,
      'w: ' + item.width,
      'h: ' + item.height,
    ].join('\n\t');
    console.log('Text Item: ', itemAsString);
  }
});

PDF-файлы Тодда

Давайте вернемся к ситуации с Тоддом, чтобы показать некоторый контекст. Мы хотим хранить данные пустышки на основе трех ключевых критериев:

  • их пищевой, чтобы сохранить здоровье младенцев,
  • их стоимость, для экономичного производства, и
  • расстояние до Тодда, чтобы поддержать маркетинговую копию компании, утверждающую, что их цепочка поставок также этична и загрязняет как можно меньше.

Я жестко запрограммировал простой скрипт, который рандомизирует некоторые фиктивные продукты, и вы можете найти его в каталоге / data сопутствующего репозитория для этого проекта. Этот сценарий записывает эти рандомизированные данные в файлы JSON.

Там также есть шаблон документа. Если вы знакомы с шаблонизаторами, такими как Handlebars, то вы это поймете. Существуют онлайн-сервисы - или, если вы любите приключения, вы можете использовать свои собственные - которые принимают данные JSON, заполняют шаблон и возвращают его вам в виде PDF-файла. Возможно, для полноты картины мы можем попробовать это в другом проекте. В любом случае: я использовал такую ​​службу для создания фиктивных PDF-файлов, которые мы будем анализировать.

Вот как это выглядит (лишние пробелы удалены):

Мы хотели бы получить из этого PDF-файла JSON, который дает нам:

  • идентификатор и дата заявки, для целей бухгалтерского учета,
  • артикул соски-пустышки для уникальной идентификации и
  • свойства соски (имя, сорт еды, цена за единицу и расстояние), чтобы Тодд действительно мог использовать их в своей работе.

как нам это сделать?

Чтение данных

Сначала давайте настроим функцию для чтения данных из одного из этих PDF-файлов и извлечения PDF-элементов pdfreader в пригодную для использования структуру данных. А пока у нас есть массив, представляющий документ. Каждый элемент в массиве - это объект, представляющий коллекцию всех текстовых элементов на странице по индексу этого объекта. Каждое свойство в объекте страницы имеет значение y для своего ключа и массив текстовых элементов, найденных при этом значении y для его значения. Вот диаграмма, чтобы ее было проще понять:

Функция readPDFPages в /parser/index.js обрабатывает это, как и в примере кода, написанном выше:

/* Accepts a buffer (e.g.: from fs.readFile), and parses
 * it as a PDF, giving back a usable data structure for
 * application-specific, second-level parsing.
 */
function readPDFPages (buffer) {
  const reader = new PdfReader();
  // We're returning a Promise here, as the PDF reading
  // operation is asynchronous.
  return new Promise((resolve, reject) => {
    // Each item in this array represents a page in the PDF
    let pages = [];
    reader.parseBuffer(buffer, (err, item) => {
      if (err)
        // If we've got a problem, eject!
        reject(err)
      else if (!item)
        // If we're out of items, resolve with the data structure
        resolve(pages);
      else if (item.page)
        // If the parser's reached a new page, it's time to
        // work on the next page object in our pages array.
        pages.push({});
      else if (item.text) {
        // If we have NOT got a new page item, then we need
        // to either retrieve or create a new "row" array
        // to represent the collection of text items at our
        // current Y position, which will be this item's Y
        // position.
        // Hence, this line reads as,
        // "Either retrieve the row array for our current page,
        //  at our current Y position, or make a new one"
        const row = pages[pages.length-1][item.y] || [];
        // Add the item to the reference container (i.e.: the row)
        row.push(item.text);
        // Include the container in the current page
        pages[pages.length-1][item.y] = row;
      }
    });
  });
}

Итак, теперь, передав буфер PDF в эту функцию, мы получим некоторые организованные данные. Вот что я получил после пробного запуска и распечатал его в формате JSON:

[ { '3.473': [ 'PRODUCT DETAILS REQUISITION' ],
    '4.329': [ 'Date: 23/05/2019' ],
    '5.185': [ 'Requsition ID: 298831' ],
    '6.898': [ 'Pacifier Tech', 'Todd Lerr' ],
    '7.754': [ '123 Example Blvd', 'DummEth Pty. Ltd.' ],
    '8.61': [ 'Timbuktu', '1337 Leet St' ],
    '12.235': [ 'SKU', '6308005' ],
    '13.466': [ 'Product Name', 'Square Lemon Qartz Pacifier' ],
    '14.698': [ 'Food Grade', '3' ],
    '15.928999999999998': [ '$ / kg', '1.29' ],
    '17.16': [ 'Location', '55' ] } ]

Если вы внимательно посмотрите, то заметите орфографическую ошибку в исходном PDF-файле. «Запрос» написан с ошибкой как «Запрос». Прелесть нашего парсера в том, что мы не особо заботимся о подобных ошибках во входных документах. Если они правильно структурированы, мы можем точно извлекать из них данные.

Теперь нам просто нужно организовать это во что-то более удобное для использования (как если бы мы открывали его через API). Структура, которую мы ищем, выглядит примерно так:

{
  reqID: '000000',
  date: 'DD/MM/YYYY', // Or something else based on geography
  sku: '000000',
  name: 'Some String We Have Trimmed',
  foodGrade: 'X',
  unitPrice: 'D.CC',  // D for Dollars, C for Cents
  location: 'XX',
}

В сторону: целостность данных

Почему мы включаем числа в виде строк? Он основан на риске синтаксического анализа. Скажем так, мы преобразовали все наши числа в строки:

Цена за единицу и расположение подойдут - в конце концов, они должны быть счетными числами.

Пищевой сорт для этого очень ограниченного проекта технически безопасен. Никакие данные не теряются, когда мы их приводим, но если это действительно классификатор, такой как Enum, то его лучше сохранить в виде строки.

Однако ID заявки и SKU, если они будут преобразованы в строки, могут потерять важные данные. Если идентификатор данной заявки начинается с трех нулей, и мы приводим его к числу, что ж, мы только что потеряли эти нули и испортили данные.

Так как мы хотим целостности данных при чтении PDF-файлов, мы просто оставляем все как String. Если код приложения хочет преобразовать некоторые поля в числа, чтобы их можно было использовать для арифметических или статистических операций, то мы позволим принуждение произойти на этом уровне. Здесь нам просто нужно что-то, что последовательно и точно анализирует PDF-файлы.

Реструктуризация данных

Итак, теперь у нас есть информация Тодда, нам просто нужно упорядочить ее в удобном для использования виде. Мы можем использовать множество функций манипулирования массивами и объектами, и здесь MDN - ваш друг.

Это этап, на котором у каждого свои предпочтения. Некоторые предпочитают метод, который просто выполняет свою работу и сводит к минимуму время разработки. Другие предпочитают искать лучший алгоритм для работы (например, сократить время итерации). Это хорошее упражнение, чтобы увидеть, сможете ли вы придумать способ сделать это и сравнить его с тем, что получил я. Я хотел бы видеть лучше, проще, быстрее или даже просто разные способы достижения одной и той же цели.

Во всяком случае, вот как я это сделал: функция parseToddPDF в /parser/index.js.

function parseToddPDF (pages) {
  const page = pages[0]; // We know there's only going to be one page
  // Declarative map of PDF data that we expect, based on Todd's structure
  const fields = {
    // "We expect the reqID field to be on the row at 5.185, and the
    //  first item in that array"
    reqID: { row: '5.185', index: 0 },
    date: { row: '4.329', index: 0 },
    sku: { row: '12.235', index: 1 },
    name: { row: '13.466', index: 1 },
    foodGrade: { row: '14.698', index: 1 },
    unitPrice: { row: '15.928999999999998', index: 1 },
    location: { row: '17.16', index: 1 },
  };
  const data = {};
  // Assign the page data to an object we can return, as per
  // our fields specification
  Object.keys(fields)
    .forEach((key) => {
      const field = fields[key];
      const val = page[field.row][field.index];
      // We don't want to lose leading zeros here, and can trust
      // any application / data handling to worry about that. This is
      // why we don't coerce to Number.
      data[key] = val;
    });
  // Manually fixing up some text fields so they're usable
  data.reqID = data.reqID.slice('Requsition ID: '.length);
  data.date = data.date.slice('Date: '.length);
  return data;
}

Мясо и картофель здесь находятся в цикле forEach и о том, как мы его используем. После получения позиции Y каждого текстового элемента до этого просто указать каждое поле, которое мы хотим, в качестве позиции в нашем объекте страниц. Фактически предоставляя карту для подражания.

Все, что нам нужно сделать, это объявить объект данных для вывода, выполнить итерацию по каждому указанному нами полю, следовать по маршруту в соответствии с нашей спецификацией и присвоить значение, которое мы находим в конце, нашему объекту данных.

После нескольких однострочников, чтобы привести в порядок некоторые строковые поля, мы можем вернуть объект данных, и мы отправляемся в гонки. Вот как это выглядит:

{ reqID: '298831',
  date: '23/05/2019',
  sku: '6308005',
  name: 'Square Lemon Qartz Pacifier',
  foodGrade: '3',
  unitPrice: '1.29',
  location: '55' }

Собираем все вместе

Теперь мы перейдем к созданию некоторого параллелизма для этого модуля синтаксического анализа, чтобы мы могли работать в масштабе, и распознали некоторые важные препятствия для этого. Приведенная выше диаграмма отлично подходит для понимания контекста логики синтаксического анализа. Это не очень важно для понимания того, как мы собираемся распараллеливать его. Мы можем лучше:

Я знаю, что это банально и, возможно, слишком обобщенно, чтобы мы могли использовать его на практике, но эй, это фундаментальная концепция, которую нужно формализовать.

Теперь, прежде всего, нам нужно подумать о том, как мы собираемся обрабатывать ввод и вывод нашей программы, что по сути будет заключать логику синтаксического анализа в оболочку, а затем распределять ее между рабочими процессами синтаксического анализатора. Здесь мы можем задать много вопросов и найти множество решений:

  • это будет приложение командной строки?
  • Будет ли это согласованный сервер с набором конечных точек API? Здесь есть свои вопросы - например, REST или GraphQL?
  • Может быть, это просто скелетный модуль в более широкой кодовой базе - например, что, если бы мы обобщили наш синтаксический анализ по набору двоичных документов и захотели отделить модель параллелизма от конкретного типа исходного файла и реализации синтаксического анализа?

Для простоты я собираюсь заключить логику синтаксического анализа в служебную программу командной строки. Это означает, что пора сделать несколько предположений:

  • ожидает ли он путей к файлам в качестве входных данных, и являются ли они относительными или абсолютными?
  • Или вместо этого ожидается, что объединенные данные PDF будут переданы по конвейеру?
  • Будет ли он выводить данные в файл? Потому что, если это так, то нам придется предоставить эту опцию в качестве аргумента, который должен указать пользователь ...

Обработка ввода в командной строке

Опять же, стараясь сделать все как можно проще: я решил, что программа ожидает список путей к файлам либо в виде отдельных аргументов командной строки:

node index file-1.pdf file-2.pdf … file-n.pdf

Или передается на стандартный ввод в виде списка путей к файлам, разделенных новой строкой:

# read lines from a text file with all our paths
cat files-to-parse.txt | node index
# or perhaps just list them from a directory
find ./data -name “*.pdf” | node index

Это позволяет процессу Node манипулировать порядком этих путей любым способом, который он считает нужным, что позволяет нам масштабировать код обработки позже. Для этого мы собираемся прочитать список путей к файлам, каким бы способом они ни были указаны, и разделить их произвольным числом на подсписки. Вот код, метод getTerminalInput в ./input/index.js:

function getTerminalInput (subArrays) {
  return new Promise((resolve, reject) => {
    const output = [];
  
    if (process.stdin.isTTY) {
      const input = process.argv.slice(2);
      const len = Math.min(subArrays, Math.ceil(input.length / subArrays));
      while (input.length) {
        output.push(input.splice(0, len));
      }
      resolve(output);
    } else {
    
      let input = '';
      process.stdin.setEncoding('utf-8');
      process.stdin.on('readable', () => {
        let chunk;
        while (chunk = process.stdin.read())
          input += chunk;
      });
      process.stdin.on('end', () => {
        input = input.trim().split('\n');
        const len = Math.min(input.length, Math.ceil(input.length / subArrays));
        while (input.length) {
          output.push(input.splice(0, len));
        }
        resolve(output);
      })
    
    }
    
  });
}

Зачем делить список? Допустим, у вас есть 8-ядерный процессор на потребительском оборудовании и 500 файлов PDF для анализа.

К сожалению для Node, хотя он фантастически обрабатывает асинхронный код благодаря своему циклу событий, он работает только в одном потоке. Для обработки этих 500 PDF-файлов, если вы не используете многопоточный (например, многопроцессорный) код, вы используете только восьмую часть своей вычислительной мощности. Предполагая, что эффективность памяти не является проблемой, вы можете обрабатывать данные до восьми раз быстрее, используя преимущества встроенных модулей параллелизма Node.

Разделение нашего ввода на куски позволяет нам это сделать.

Кроме того, это, по сути, примитивный балансировщик нагрузки, который явно предполагает, что рабочие нагрузки, представленные анализом каждого PDF-файла, взаимозаменяемы. То есть PDF-файлы имеют одинаковый размер и одинаковую структуру.

Это, очевидно, тривиальный случай, тем более что мы не учитываем обработку ошибок в рабочих процессах и то, какой воркер в настоящее время доступен для обработки новых нагрузок. В случае, если бы мы настроили сервер API для обработки входящих запросов на синтаксический анализ, нам пришлось бы учитывать эти дополнительные потребности.

Кластеризация нашего кода

Теперь, когда у нас есть входные данные, разделенные на управляемые рабочие нагрузки, по общему признанию, надуманным способом - я бы с удовольствием реорганизовал это позже - давайте рассмотрим, как мы можем их кластеризовать. Получается, что в Node есть два отдельных модуля для настройки параллельного кода.

Тот, который мы собираемся использовать, модуль cluster, в основном позволяет процессу Node порождать свои копии и балансировать обработку между ними по своему усмотрению.

Он построен на основе модуля child_process, который менее тесно связан с распараллеливанием самих программ Node и позволяет вам запускать другие процессы, такие как программы оболочки или другой исполняемый двоичный файл, и взаимодействовать с ними, используя стандартный ввод, вывод и т. Д. .

Я настоятельно рекомендую прочитать документацию по API для каждого модуля, так как они фантастически написаны, и даже если вы похожи на меня и находите бесцельное ручное чтение скучным и полной загруженностью, по крайней мере, ознакомьтесь Введение в каждый модуль поможет вам углубиться в тему и расширить свои знания об экосистеме Node.

Итак, давайте пройдемся по коду. Вот это оптом:

const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
const { getTerminalInput } = require('./input');
(async function main () {
  if (cluster.isMaster) {
    const workerData = await getTerminalInput(numCPUs);
    for (let i = 0; i < workerData.length; i++) {
      const worker = cluster.fork();
      const params = { filenames: workerData[i] };
      worker.send(params);
    }
  } else {
    require('./worker');
  }
})();

Итак, наши зависимости довольно просты. Во-первых, это кластерный модуль, как описано выше. Во-вторых, нам нужен модуль os, чтобы точно определить, сколько ядер ЦП имеется на нашей машине, что является фундаментальным параметром разделения нашей рабочей нагрузки. Наконец, есть наша функция обработки ввода, которую я для полноты картины передал другому файлу.

Теперь основной метод на самом деле довольно прост. Фактически, мы могли бы разбить это на шаги:

  1. Если мы являемся основным процессом, разделите отправленные нам входные данные поровну по количеству ядер ЦП для этой машины.
  2. Для каждой нагрузки будущего воркера создайте воркера cluster.fork и настройте объект, который мы можем послать ему через межпроцессный канал сообщений RPC модуля [cluster], и послать ему эту чертову штуку.
  3. Если мы на самом деле не главный модуль, то мы должны быть работником - просто запустите код в нашем рабочем файле и завершите его.

Здесь не происходит ничего сумасшедшего, и это позволяет нам сосредоточиться на реальном подъеме, который заключается в выяснении того, как рабочий будет использовать список имен файлов, который мы ему даем.

Обмен сообщениями, асинхронный режим и потоки - все элементы полноценной диеты

Во-первых, как указано выше, позвольте мне сбросить код, на который вы можете сослаться. Поверьте мне, просмотр вначале позволит вам пропустить любое объяснение, которое вы сочтете тривиальным.

const Bufferer = require('../bufferer');
const Parser = require('../parser');
const { createReadStream } = require('fs');
process.on('message', async (options) => {
  const { filenames } = options;
  const parser = new Parser();
  const parseAndLog = async (buf) => console.log(await parser.parse(buf) + ',');
  const parsingQueue = filenames.reduce(async (result, filename) => {
    await result;
    return new Promise((resolve, reject) => {
      const reader = createReadStream(filename);
      const bufferer = new Bufferer({ onEnd: parseAndLog });
      reader
        .pipe(bufferer)
        .once('finish', resolve)
        .once('error', reject)
    
    });
  
  }, true);
  try {
    await parsingQueue;
    process.exit(0);
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
});

Здесь есть несколько грязных уловок, так что будьте осторожны, если вы один из непосвященных (только шучу). Давайте сначала посмотрим, что происходит:

Шаг первый - потребовать все необходимые ингредиенты. Имейте в виду, это основано на том, что делает сам код. Итак, позвольте мне просто сказать, что мы собираемся использовать настраиваемый поток с возможностью записи, который я ласково назвал Bufferer, оболочку для нашей логики синтаксического анализа из прошлого раза, также замысловато названную Parser и старый добрый надежный createReadStream из fs. модуль.

Вот где происходит волшебство. Вы заметите, что на самом деле в функции ничего не заключено. Весь рабочий код просто ждет сообщения в процесс - сообщения от его мастера с работой, которую он должен сделать в течение дня. Простите за средневековый язык.

Итак, прежде всего мы видим, что это асинхронно. Сначала мы извлекаем имена файлов из самого сообщения - если бы это был производственный код, я бы проверял их здесь. На самом деле, черт, я бы проверял их раньше в нашем коде обработки ввода. Затем мы создаем экземпляр нашего объекта синтаксического анализа - только один для всего процесса - это значит, что мы можем анализировать несколько буферов с помощью одного набора методов. Меня беспокоит то, что он управляет памятью изнутри, и, поразмыслив, стоит рассмотреть это позже.

Кроме того, существует простая оболочка parseAndLog для синтаксического анализа, которая регистрирует буфер PDF-файла в формате JSON с добавленной к нему запятой, просто для того, чтобы упростить объединение результатов синтаксического анализа нескольких PDF-файлов.

Наконец, суть дела - асинхронная очередь. Позволь мне объяснить:

Этот рабочий получил список имен файлов. Для каждого имени файла (или пути, на самом деле) нам нужно открыть читаемый поток через файловую систему, чтобы мы могли получить данные PDF. Затем нам нужно создать наш Bufferer (наш официант, следуя аналогии с рестораном ранее), чтобы мы могли перенести данные в наш парсер.

Bufferer изготавливается по индивидуальному заказу. Все, что он на самом деле делает, это принимает функцию для вызова, когда она получает все необходимые данные - здесь мы просто просим ее проанализировать и зарегистрировать эти данные.

Итак, теперь у нас есть все части, мы просто соединяем их вместе:

  1. Читаемый поток - файл PDF, каналы к Bufferer
  2. Bufferer завершает работу и вызывает наш parseAndLog метод для всего работника.

Весь этот процесс заключен в обещание, которое само возвращается в функцию уменьшения, внутри которой находится. Когда он разрешится, операция уменьшения продолжится.

Эта асинхронная очередь на самом деле является действительно полезным шаблоном, поэтому я расскажу о ней более подробно в моем следующем посте, который, вероятно, будет более объемным, чем несколько последних.

В любом случае остальная часть кода просто завершает процесс на основе обработки ошибок. Опять же, если бы это был производственный код, вы можете поспорить, что здесь было бы более надежное ведение журнала и обработка ошибок, но в качестве доказательства концепции это кажется нормальным.

Так работает, но полезно ли?

Вот и все. Это был небольшой путь, и он, безусловно, работает, но, как и любой другой код, важно проанализировать его сильные и слабые стороны. С верхней части моей головы:

  • Потоки нужно складывать в буферы. Это, к сожалению, лишает смысла использование потоков, и соответственно страдает эффективность памяти. Это необходимая клейкая лента для работы с модулем pdfreader. Мне бы хотелось посмотреть, есть ли способ передавать данные PDF в потоковом режиме и анализировать их на более мелком уровне. Особенно, если к нему все еще можно применить модульную функциональную логику синтаксического анализа.
  • На этом младенческом этапе логика синтаксического анализа также раздражающе хрупкая. Подумайте, а что, если у меня есть документ длиннее страницы? Куча предположений вылетает из окна и делает потребность в потоковой передаче данных PDF еще более острой.
  • Наконец, было бы здорово увидеть, как мы могли бы создать эту функциональность с помощью журналов и конечных точек API для предоставления общественности - за плату или бесплатно, в зависимости от контекста, в котором она используется.

Если у вас есть какие-либо конкретные критические замечания или замечания, я тоже хотел бы их услышать, поскольку обнаружение слабых мест в коде - это первый шаг к их исправлению. И, если вы знаете какой-либо лучший способ одновременной потоковой передачи и анализа PDF-файлов, дайте мне знать, и я могу оставить его здесь для всех, кто читает этот пост в поисках ответа. В любом случае - или для любой другой цели - отправьте мне электронное письмо или свяжитесь с Reddit.