Как я могу создавать потоки `flatmap` в Rust?

У меня есть rusoto_core::ByteStream, который реализует фьючерсная черта Stream:

let chunks = vec![b"1234".to_vec(), b"5678".to_vec()];
let stream = ByteStream::new(stream::iter_ok(chunks));

Я хочу передать его в метод actix_web HttpResponseBuilder::streaming.

use actix_web::dev::HttpResponseBuilder; // 0.7.18
use rusoto_core::ByteStream; // 0.36.0

fn example(stream: ByteStream, builder: HttpResponseBuilder) {
    builder.streaming(stream);
}

Когда я пытаюсь это сделать, я получаю следующую ошибку:

error[E0271]: type mismatch resolving `<rusoto_core::stream::ByteStream as futures::stream::Stream>::Item == bytes::bytes::Bytes`
 --> src/main.rs:5:13
  |
5 |     builder.streaming(stream);
  |             ^^^^^^^^^ expected struct `std::vec::Vec`, found struct `bytes::bytes::Bytes`
  |
  = note: expected type `std::vec::Vec<u8>`
             found type `bytes::bytes::Bytes`

Я считаю, что причина в том, что streaming() ожидает S: Stream<Item = Bytes, Error> (т. е. Item = Bytes), но мой ByteStream имеет Item = Vec<u8>. Как я могу это исправить?

Я думаю, что решение состоит в том, чтобы как-то flatmap мой ByteStream но я не смог найти такой метод для потоков.

Вот пример использования streaming():

let text = "123";
let (tx, rx_body) = mpsc::unbounded();
let _ = tx.unbounded_send(Bytes::from(text.as_bytes()));

HttpResponse::Ok()
    .streaming(rx_body.map_err(|e| error::ErrorBadRequest("bad request")))

person H. Desane    schedule 30.01.2019    source источник


Ответы (1)


Как я могу flatmap вести стримы в Rust?

Плоская карта преобразует итератор итераторов в один итератор (или поток вместо итератора).

Фьючерсы 0,3

В Futures 0.3 нет прямой плоской карты, но есть StreamExt::flatten, который можно использовать после StreamExt::map.

use futures::{stream, Stream, StreamExt}; // 0.3.1

fn into_many(i: i32) -> impl Stream<Item = i32> {
    stream::iter(0..i)
}

fn nested() -> impl Stream<Item = i32> {
    let stream_of_number = into_many(5);
    let stream_of_stream_of_number = stream_of_number.map(into_many);
    let flat_stream_of_number = stream_of_stream_of_number.flatten();

    // Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
    flat_stream_of_number
}

Фьючерсы 0,1

В Futures 0.1 нет прямой плоской карты, но есть Stream::flatten, который можно использовать после Stream::map.

use futures::{stream, Stream}; // 0.1.25

fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
    stream::iter_ok(0..i)
}

fn nested() -> impl Stream<Item = i32, Error = ()> {
    let stream_of_number = into_many(5);
    let stream_of_stream_of_number = stream_of_number.map(into_many);
    let flat_stream_of_number = stream_of_stream_of_number.flatten();

    // Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
    flat_stream_of_number
}

Однако это не решает вашу проблему.

streaming() ожидает S: Stream<Item = Bytes, Error> (то есть Item = Bytes), но мой ByteStream имеет Item = Vec<u8>

Да, это проблема. Используйте Bytes::from через Stream::map для преобразования ваш поток Item из одного типа в другой:

use bytes::Bytes; // 0.4.11
use futures::Stream; // 0.1.25 

fn example(stream: ByteStream, mut builder: HttpResponseBuilder) {
    builder.streaming(stream.map(Bytes::from));
}
person Shepmaster    schedule 30.01.2019