В современном цифровом мире данные распространены повсеместно, и становится все более важным эффективно обрабатывать их большие объемы. Один из способов сделать это — использовать потоковую передачу, процесс, который позволяет обрабатывать данные, как только они становятся доступными, вместо того, чтобы ждать прибытия всего набора данных. В этом сообщении блога рассматриваются тонкости потоковой передачи веб-данных с использованием Streams API, мощного инструмента, предоставляемого современными веб-браузерами.

Введение

Понимание потокового API

Streams API предоставляет интерфейс для чтения или записи асинхронных потоковых данных в веб-приложениях. Это означает, что вы можете начать обработку своих данных сразу же после их получения, а не ждать, пока загрузится весь набор данных. Это особенно полезно при работе с большими файлами или данными в реальном времени, где скорость и эффективность имеют первостепенное значение.

Преимущества потокового API

Есть несколько преимуществ использования Streams API. Прежде всего, это позволяет эффективно обрабатывать большие наборы данных, уменьшая использование памяти и повышая производительность. Кроме того, поскольку Streams API встроен в современные веб-браузеры, для него не требуются дополнительные библиотеки или плагины, что упрощает его реализацию и поддержку.

Понимание основ Streams API

Как работает API потоков

Streams API предоставляет методы для использования потоков данных из Streaming API. Это позволяет вам читать и записывать данные порциями, которые можно обрабатывать по мере поступления. Это делается путем вызова функции read(), которая возвращает промис, который разрешается с объектом, содержащим свойство «done» и свойство «value». Свойство «готово» указывает, есть ли еще фрагменты для чтения, а свойство «значение» содержит сам фрагмент данных.

Понимание синтаксиса

Обычный шаблон использования потоковых считывателей включает в себя написание функции, которая начинается с чтения потока. Если больше нет потока для чтения, вы выходите из функции. Если есть еще поток для чтения, вы обрабатываете текущий фрагмент, а затем снова запускаете функцию. Этот процесс повторяется до тех пор, пока не закончится поток для чтения.

Чтение и запись потоков данных

Чтение из потока выполняется путем вызова метода read() для объекта чтения. При этом из потока считывается один фрагмент, который затем можно обрабатывать по мере необходимости. С другой стороны, запись в поток выполняется путем вызова метода write() объекта записи и передачи данных, которые вы хотите записать.

Интерфейсы потоков
В Streams API есть три основных интерфейса, с которыми вы будете работать: потоки для чтения, потоки для записи и потоки преобразования. Каждый из них играет решающую роль в процессе потоковой передачи, позволяя вам эффективно обрабатывать данные и манипулировать ими.

Потоки с возможностью чтения
Потоки с возможностью чтения представляют собой источник данных, из которого вы можете читать. Они создаются с помощью конструктора Readable Stream. Как только читаемый поток создан, вы можете вызвать его метод getReader() для создания объекта чтения. Затем этот объект чтения используется для чтения фрагментов данных из потока.

Потоки с возможностью записи
Потоки с возможностью записи представляют собой место назначения, куда вы можете записывать данные. Они создаются с помощью конструктора Writable Stream. Подобно потокам с возможностью чтения, после создания потока с возможностью записи вы можете вызвать его метод getWriter() для создания объекта записи. Затем этот объект записи используется для записи фрагментов данных в поток.

Потоки преобразования
Потоки преобразования представляют собой комбинацию потоков, доступных для чтения и записи, что позволяет преобразовывать данные по мере их чтения или записи. Это полезно для таких операций, как сжатие или шифрование. Потоки преобразования создаются с помощью конструктора Transform Stream. У них есть как сторона для чтения, так и сторона для записи, поэтому вы можете читать данные с одной стороны и записывать преобразованные данные на другую.

Работа с объектом Readable Stream

Объект Readable Stream представляет собой источник данных, из которого вы можете считывать контролируемым образом. У него есть методы для чтения данных из потока, проверки выполнения потока или ошибки, а также блокировки потока, чтобы предотвратить чтение из него другим кодом, пока вы все еще обрабатываете данные.

Чтение содержимого потока

Вы можете прочитать данные из Readable Stream, вызвав его метод read(). Это возвращает обещание, которое разрешается с помощью объекта, содержащего свойство «done» и свойство «value». Свойство «готово» указывает, есть ли еще фрагменты для чтения, а свойство «значение» содержит сам фрагмент данных.

Проверка выполнения потока или ошибки

После вызова метода read() вы можете проверить, выполнен ли поток или нет ошибки, посмотрев на свойство «done» возвращаемого объекта. Если «готово» верно, больше не нужно читать куски. Если в потоке возникает ошибка, обещание будет отклонено с соответствующей ошибкой.

Вот простой пример того, как вы можете использовать метод `getReader()` и рекурсивное чтение для реализации потоковой передачи в JavaScript с помощью Streams API:

// Assume response is a Response object from fetch API
const stream = response.body; // get ReadableStream

const reader = stream.getReader(); // get a reader

// Recursive function to read each chunk as it arrives
function readStream() {
  return reader.read()
   .then(({ done, value }) => {
       if (done) {
         console.log('Stream complete');
         return;
       }
       console.log(value); // process chunk of data
       return readStream(); // recursively call itself until done
    });
}

readStream().catch(console.error);

В этом коде мы сначала получаем `ReadableStream` из объекта `Response` (при условии, что мы получили `Response` из API `fetch`). Затем мы получаем читатель для потока, используя метод getReader(). Мы определяем рекурсивную функцию `readStream()`, которая читает фрагмент из потока с помощью метода `read()`, обрабатывает этот фрагмент и затем вызывает себя для чтения следующего фрагмента. Это продолжается до тех пор, пока не останется фрагментов для чтения. Если при чтении потока возникает ошибка, она будет обнаружена и зарегистрирована в консоли.

Декодирование фрагмента с помощью TextDecoder()

Интерфейс TextDecoder в JavaScript — это встроенный глобальный объект, предоставляющий метод декодирования потока двоичных данных в текст. Это особенно полезно при работе с потоками, поскольку они часто предоставляют данные в форме ArrayBuffers или других двоичных форматов.
Чтобы использовать TextDecoder, вы сначала создаете его новый экземпляр. Затем вы можете вызвать его метод decode(), передав фрагмент данных, который вы хотите декодировать. Это вернет строку, содержащую декодированный текст.

Вот пример того, как вы можете использовать TextDecoder для декодирования фрагмента данных из потока:

// Assume response is a Response object from fetch API
const stream = response.body; // get ReadableStream
const reader = stream.getReader(); // get a reader
const decoder = new TextDecoder('utf-8'); // create a new TextDecoder

// Recursive function to read each chunk as it arrives
function readStream() {
  return reader.read()
     .then(({ done, value }) => {
        if (done) {
         console.log('Stream complete');
         return;
        }

        const text = decoder.decode(value); // decode the chunk of data
        console.log(text); // process the decoded text

        return readStream(); // recursively call itself until done
     });
}

readStream().catch(console.error);

В этом коде мы сначала создаем TextDecoder с кодировкой utf-8. Затем внутри нашей функции `readStream()` мы используем метод `decode()` TextDecoder` для декодирования каждого фрагмента данных в строку. Затем мы обрабатываем эту строку (в данном случае просто выводим ее на консоль) и продолжаем чтение следующего фрагмента данных до тех пор, пока не закончатся фрагменты для чтения.

Помните, что не все двоичные данные являются текстом, и попытка декодировать нетекстовые данные как текст может привести к искажению вывода. Поэтому важно использовать TextDecoder только тогда, когда вы знаете, что данные, с которыми вы имеете дело, на самом деле являются текстом.

Понимание заблокированного свойства в Streams API

Свойство «заблокировано» в Streams API указывает, заблокирован ли поток для чтения. Когда поток заблокирован, вы не можете получить для него другой считыватель или напрямую управлять потоком. Это полезно для обеспечения того, чтобы несколько фрагментов кода не пытались читать из потока одновременно, что может привести к ошибкам или несоответствиям.

Примеры использования Streams API

Пример 1. Реализация «кричащей» версии Fetch()

Преобразование чанков в верхний регистр

Представьте, что вы получаете поток текстовых данных и хотите преобразовать весь текст в верхний регистр по мере его поступления. Вы можете сделать это с помощью Streams API, создав пользовательский поток преобразования, который преобразует каждый фрагмент в верхний регистр перед его передачей. Это позволяет обрабатывать данные в режиме реального времени, не дожидаясь загрузки всего набора данных.

Добавление к потоку DOM

Еще одним интересным вариантом использования Streams API является добавление данных в модель DOM в режиме реального времени. Например, предположим, что вы получаете большой HTML-документ по частям. С помощью Streams API вы можете добавлять каждый фрагмент в модель DOM, как только он поступает, предоставляя пользователю немедленную обратную связь и более плавный просмотр.

Чтение и запись чанка

Чтение из потока и регистрация каждого фрагмента данных — простая задача с Streams API. Вызывая метод read() в цикле, вы можете регистрировать каждую порцию данных по мере их поступления. Это полезно для наблюдения за ходом загрузки данных или для отладки вашего кода.

Пример 2. Приложение потокового чата с Streams API

Представьте, что вы создаете чат-приложение, которое должно обрабатывать большие объемы данных в режиме реального времени. Вы можете использовать Streams API для эффективной обработки входящих сообщений и отображения их пользователю в режиме реального времени.

Вот как вы можете настроить простой поток чата:

// Assume socket is a WebSocket connection
const socket = new WebSocket('wss://your-chat-server.com');
const stream = new ReadableStream({
   start(controller) {
      // When a message is received, enqueue it into the stream
      socket.onmessage = event => controller.enqueue(event.data);
      socket.onclose = () => controller.close();
   }
});

// Get a reader for the stream
const reader = stream.getReader();

// Recursive function to read each message as it arrives
function readStream() {
   return reader.read()
     .then(({ done, value }) => {
        if (done) {
          console.log('Chat stream complete');
          return;
        }

        // Process the message (e.g., display it in the UI)
        console.log(value);
        return readStream(); // recursively call itself until done
    });
}

readStream().catch(console.error);

В этом коде мы сначала создаем `ReadableStream`, который ставит в очередь сообщения из соединения WebSocket в поток. Затем мы получаем средство чтения для потока и определяем рекурсивную функцию для чтения каждого сообщения по мере его поступления и отображения его в пользовательском интерфейсе. Это продолжается до тех пор, пока не останется сообщений для чтения.

Практический пример 3. Прямая трансляция видео с помощью Streams API

Streams API также можно использовать для обработки видеопотоков. Например, предположим, вы создаете платформу для потокового видео в реальном времени. Вы можете использовать Streams API для эффективной обработки входящих видеоданных и отображения их пользователю в режиме реального времени.

Вот базовый пример того, как вы можете настроить видеопоток:

// Assume fetchVideo is a function that fetches video data from a server
const response = await fetchVideo('https://your-video-server.com/video');

// Get a reader for the video stream
const reader = response.body.getReader();

// Create a new MediaSource
const mediaSource = new MediaSource();
const url = URL.createObjectURL(mediaSource);

// Set the MediaSource URL as the source for a video element
const video = document.querySelector('video');
video.src = url;

mediaSource.addEventListener('sourceopen', () => {
  const sourceBuffer = mediaSource.addSourceBuffer('video/webm; codecs="vp8"');

 // Recursive function to read each chunk of video data as it arrives
 function readStream() {
    return reader.read()
       .then(({ done, value }) => {
          if (done) {
            mediaSource.endOfStream();
            return;
          }

          // Append the chunk of video data to the SourceBuffer
          sourceBuffer.appendBuffer(value);
          return readStream(); // recursively call itself until done
       });
  }

  readStream().catch(console.error);
});

В этом коде мы сначала получаем некоторые видеоданные с сервера и получаем ридер для видеопотока. Затем мы создаем «MediaSource», устанавливаем его URL-адрес в качестве источника для видеоэлемента и добавляем «SourceBuffer» к «MediaSource». Мы определяем рекурсивную функцию для чтения каждого фрагмента видеоданных по мере их поступления и добавления их в `SourceBuffer`. Это продолжается до тех пор, пока не останется фрагментов для чтения.

Расширенные концепции в Streams API

Асинхронная итерация в Streams API

Понимание асинхронной итерации.Асинхронная итерация — это функция JavaScript, позволяющая выполнять итерацию по данным, сгенерированным асинхронно, например данным, поступающим из потока. Это достигается с помощью цикла for await…of, который ожидает разрешения каждого промиса, прежде чем перейти к следующей итерации. Это упрощает обработку каждого фрагмента данных по мере их поступления, не блокируя остальную часть вашего кода.

Реализация асинхронной итерации с помощью полифилла. Если вы работаете в среде, изначально не поддерживающей асинхронную итерацию, вы все равно можете использовать ее, реализовав полифилл. Полифил — это фрагмент кода, обеспечивающий современные функции в старых средах, которые изначально не поддерживают его. Включив полифилл для асинхронной итерации, вы можете использовать цикл for await…of для обработки потоков данных.

Конвейер и цепочка в Streams API

Конвейерная связь — это ключевая концепция в Streams API, позволяющая соединять несколько потоков вместе. Это означает, что вы можете взять читаемый поток, направить его через один или несколько потоков преобразования, а затем направить вывод в доступный для записи поток. Данные проходят через конвейер автоматизации автоматически, преобразовываясь по ходу.

Сцепление – это родственное понятие, которое относится к созданию цепочки из нескольких потоков преобразования. Выходные данные одного потока преобразования становятся входными данными для следующего и так далее. Это позволяет применять к вашим данным несколько преобразований простым и линейным способом.

Streams API также предоставляет ряд других функций, таких как обработка обратного давления и распространение ошибок, которые делают его еще более мощным и гибким. Обработка противодавления гарантирует, что производитель не перегружает потребителя данными, в то время как распространение ошибок позволяет передавать ошибки по конвейеру и обрабатывать их соответствующим образом.

Заключение

Заключительные мысли о Streams API

Streams API — это мощный инструмент для обработки потоковых данных в веб-приложениях. Он предоставляет простой и согласованный интерфейс для чтения и записи данных фрагментами, что позволяет начать обработку данных сразу после их получения. Независимо от того, имеете ли вы дело с большими файлами, данными в режиме реального времени или любым другим типом потоковых данных, Streams API поможет вам эффективно и действенно с этим справиться.

В целом, Streams API — это универсальный инструмент, позволяющий легко и эффективно обрабатывать большие объемы данных. Независимо от того, работаете ли вы с файлами, сетевыми запросами или любыми другими типами данных, Streams API предоставляет согласованный и простой в использовании интерфейс для потоковой передачи данных.

Я надеюсь, что это всеобъемлющее руководство по освоению Streams API оказалось полезным для вас. Если вам понравилось это читать, рассмотрите возможность аплодировать и подписаться на меня, чтобы получать больше подобного контента. Вы также можете связаться со мной в LinkedIn и ознакомиться с другими моими проектами на GitHub.

LinkedIn
GitHub

Оставайтесь с нами для более подробных руководств и руководств!