Стратегия в Nodejs для объединения нескольких читаемых потоков

Я пытаюсь решить проблему с потоком Nodejs. Я читал документацию по потокам несколько раз и реализовал разные попытки решить эту проблему. Пробуем как с дуплексом, так и с преобразованием, с возможностью чтения и записи :)

У меня есть несколько потоков, читаемых по протоколу HTTP, и цель состоит в том, чтобы отправить данные в один конвейер с работающим противодавлением. Думаю, эта картинка помогает объяснить проблему:

введите описание изображения здесь

Обновление (13 сентября 2017 г.). После повторного прочтения документации я реализую настраиваемый записанный дуплексный поток.


person DauleDK    schedule 12.09.2017    source источник


Ответы (1)


Это представляет собой отличный вариант использования дуплексного потока в сочетании с ручным управлением потоком HTTP-потока.

Я написал собственный дуплексный поток, в котором читаемая и записываемая часть структурирована следующим образом:

введите описание изображения здесь

Если вас интересует конкретный код для дуплексного потока, отправьте мне сообщение в личку.

Код мог бы выглядеть примерно так (но он довольно старый и, вероятно, можно было бы еще больше упростить):

import 'rxjs/add/operator/skip';
import 'rxjs/add/operator/take';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import * as stream from 'stream';
import { logger, streamInspector } from '../shared';

export class DuplexStreamLinker extends stream.Duplex {
    public readCount: number = 0;
    public acceptDataCount: number = 0;
    public acceptData$: BehaviorSubject<boolean>;
    public streamName: string;

    constructor(options) {
        super(options);
        this.streamName = this.constructor.name;
        this.acceptData$ = new BehaviorSubject(false);
        streamInspector(this, this.constructor.name);
    }

    public _read(size) {
        this.readCount++;
        this.acceptData$.next(true);
    }

    public _write(chunk, encoding, cb) {
        const acceptData = this.acceptData$.getValue();
        if (acceptData) {
            cb(this.pushData(chunk));
        } else {
            this.acceptData$.skip(1).take(1).subscribe(() => {
                logger.silly('I dont fire...');
                this.acceptDataCount++;
                cb(this.pushData(chunk));
            });
        }
    }

    public endReadableStream() {
        logger.debug('DuplexStreamLinker@endReadableStream was called!');
        this.end();
        this.push(null);
    }

    public _final(cb) {
        logger.debug('DuplexStreamLinker@_final was called!');
        cb(null);
    }

    private pushData(chunk): null | Error {
        const ok = this.push(chunk);
        if (ok === false) { this.acceptData$.next(false); }
        return null;
    }

}
person DauleDK    schedule 13.09.2017
comment
у вас еще есть код, относящийся к этой диаграмме? - person Sergio Gutiérrez; 25.03.2021
comment
Привет, @ SergioGutiérrez - ›Я добавил код, но имейте в виду, что он действительно старый. - person DauleDK; 25.03.2021
comment
Ух ты! Отлично, спасибо! - person Sergio Gutiérrez; 26.03.2021