Распространенная проблема при работе с потоками в реальном времени заключается в том, что вы не знаете, какие данные туда попадают, из-за огромного количества систем, подключенных к нему и производящих данные. Поэтому интересно иметь возможность «сэмплировать» поток, когда вы подключаетесь к потоку со своими учетными данными, ждете, пока не придет событие, а затем завершаете соединение.

Но как мы можем сделать это легко? Какой код может позволить нам это сделать? Ну, есть некоторые части, которые мы должны иметь в виду при проектировании:

  1. Потоки могут быть от разных производителей (Azure Event Hub, Apache Kafka, Socket Stream и т. д.)
  2. Потоки могут получать события в очень большом промежутке времени (> 10 минут)
  3. Потоки могут быть очень быстрыми (миллионы событий в секунду).
  4. Потоки могут иметь события, поступающие не по порядку

В рамках нашего небольшого проекта находятся номера 1, 2 и 3. Итак, давайте обсудим, как мы можем решить их и как это приведет к созданию нашего семплера.

1. Стримы могут быть от разных производителей

У каждого производителя есть свой SDK. Но как сделать сэмплер, поддерживающий более одного? Ну, это все об интерфейсах! (или на самом деле чаще называется Шаблон стратегии).

Мы создадим 2 метода (open() и close()), которые должны быть реализованы в наших стратегиях, чтобы наш родительский класс мог вызывать эти методы, не беспокоясь о том, есть ли они в базовом классе или нет.

IStream.ts

export default interface IStream {
    open(e: EventEmitter) : void;
    close() : void;
}

Таким образом, мы можем относительно легко реализовать провайдеров для:

2. Стримы могут иметь большой промежуток времени между событиями

Поскольку потоки не всегда отправляют события каждые X секунд, мы должны убедиться, что при разработке семплера мы учитываем это. Поэтому нам нужно создать своего рода механизм «тайм-аута», который убивает поток, если в течение следующих секунд ничего не получено.

В Javascript мы можем относительно легко сделать это, используя функцию setTimeout, которая будет вызывать функцию после ее выполнения.

const timeout = 1000; // 1 second
let timeoutFunction = setTimeout(() => console.log('triggered'), timeout);

3. В потоках события могут прибывать очень быстро

Потоки по своей природе должны быть быстрыми, так как же нам получить только одно событие? Большинство провайдеров позволяют обрабатывать входящие сообщения. Но чтобы мгновенно остановиться, когда что-то прибудет, лучше всего использовать EventEmitter, который сработает, как только что-то прибудет.

Таким образом, наш основной процесс может перехватить это событие и вызвать наш метод close() из точки 1.

Для нашего Socket Stream это выглядит так (с onData() именно так):

async open(eventEmitter) {
    this.eventEmitter = eventEmitter;
    return new Promise((resolve, reject) => {
        this.connection = net.connect(this.port, this.host, () => {
            return resolve();
        });
        this.connection.on('data', this.onData.bind(this));
        console.log('[StreamSocket] Stream Opened');
    });
}
onData(msg) {
    this.eventEmitter.emit('stream_message_received', msg);
}

4. Слияние 2 промисов только с одним срабатыванием

Теперь самое сложное: «Как нам отменить еще одно обещание, если другое сработало?»

Чтобы решить эту проблему, я использовал EventEmitter как своего рода концепцию bus. Различные обещания (тайм-аут или полученное событие) затем запускают событие через этот EventEmitter, так что once() событие получено, оно возвращает основное обещание.

Иллюстрируя это:

let bus = new EventEmitter();
let result = await new Promise(async (resolve, reject) => {
    bus.once('event_1', (message) => resolve('event1'));
    bus.once('event_2', () => resolve('event2'));
    // Fire event 1 or 2
    setTimeout(() => bus.emit('event_1'), Math.random() * 4000);
    setTimeout(() => bus.emit('event_2'), Math.random() * 4000);
});
console.log(result);

5. Заключение

Объединив приведенные выше концепции, мы теперь можем написать сэмплер, который будет подключаться к нашим различным потокам и ждать, пока ИЛИ не прибудет сообщение ИЛИ не будет получен тайм-аут, после чего он вернет нам результат через API, который можно записать.

Чтобы увидеть это в рабочем коде, не стесняйтесь проверить этот репозиторий: https://github.com/thebillkidy/PublicProjects/tree/master/JS/Azure/EventHub/StreamSample.