Как я могу использовать оператор вопросительного знака для обработки ошибок в Tokio Futures?

У меня есть клиент, работающий с Future, который кое-что делает. Можно ли использовать impl Future<Item = (), Error = io::Error> в качестве возвращаемого типа и улучшить обработку ошибок?

pub fn handle_client(client: Client) -> impl Future<Item = (), Error = io::Error> {
    let magic = client.header.magic;
    let stream_client = TcpStream::connect(&client.addr).and_then(|stream| {
        let addr: Vec<u8> = serialize_addr(stream.local_addr()?, magic)?;
        write_all(stream, addr).then(|result| {
            // some code
            Ok(())
        })
    });
    stream_client
}

Я не могу сохранить тип io::Error во всех вложенных закрытиях / фьючерсах. Компилятор выдает ошибку

error[E0277]: the `?` operator can only be used in a function that returns `Result` or `Option` (or another type that implements `std::ops::Try`)
   --> src/client.rs:134:29
    |
134 |         let addr: Vec<u8> = serialize_addr(stream.local_addr()?, magic)?;
    |                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in a function that returns `futures::future::then::Then<tokio_io::io::write_all::WriteAll<tokio_tcp::stream::TcpStream, std::vec::Vec<u8>>, std::result::Result<(), std::io::Error>, [closure@src/client.rs:135:38: 138:10]>`
    |
    = help: the trait `std::ops::Try` is not implemented for `futures::future::then::Then<tokio_io::io::write_all::WriteAll<tokio_tcp::stream::TcpStream, std::vec::Vec<u8>>, std::result::Result<(), std::io::Error>, [closure@src/client.rs:135:38: 138:10]>`
    = note: required by `std::ops::Try::from_error`

Я выполнил цепочку обработки ошибок map / and_then, но проблема в том, что я не знаю, как получить TcpStream внутри окончательного .then закрытия. Единственное место, где я нашел TcpStream, - это внутри структуры WriteAll, но она приватная. Кроме того, write_all потребляет поток

use futures::Future;
use std::{io, net::SocketAddr};
use tokio::{
    io::{write_all, AsyncRead, AsyncWrite},
    net::TcpStream,
};

type Error = Box<dyn std::error::Error>;

fn serialize_addr(addr: SocketAddr) -> Result<Vec<u8>, Error> {
    Ok(vec![])
}

fn handle_client(addr: &SocketAddr) -> impl Future<Item = (), Error = Error> {
    TcpStream::connect(addr)
        .map_err(Into::into)
        .and_then(|stream| stream.local_addr().map(|stream_addr| (stream, stream_addr)))
        .map_err(Into::into)
        .and_then(|(stream, stream_addr)| serialize_addr(stream_addr).map(|info| (stream, info)))
        .map(|(stream, info)| write_all(stream, info))
        .then(|result| {
            let result = result.unwrap();
            let stream = match result.state {
                Writing { a } => a,
                _ => panic!("cannot get stream"),
            };
            // some code
            Ok(())
        })
}

fn main() {
    let addr = "127.0.0.1:8900".parse().unwrap();
    handle_client(&addr);
}

person abritov    schedule 28.02.2019    source источник
comment
Например, ваш код использует неопределенные типы и методы, сигнатуры которых нам неизвестны. Ваше сообщение об ошибке даже не соответствует коду.   -  person Shepmaster    schedule 28.02.2019
comment
Почему вы использовали map(... write_all), когда в приведенном ниже коде используется and_then? Вы не можете произвольно изменить вызываемые методы и ожидать, что они будут работать. При использовании and_then значение успеха будущего равно (TcpStream, Vec<u8>).   -  person Shepmaster    schedule 01.03.2019
comment
Не знаю почему, но без преобразования в serialize_addr(remote_addr).map_err(|_| io::Error::from(io::ErrorKind::AddrNotAvailable)) я не мог использовать .and_then вместо .map   -  person abritov    schedule 02.03.2019


Ответы (2)


TL; DR: вы не используете оператор ?.


Поскольку вы его не предоставили, вот MCVE вашей проблемы. Обратите внимание, что мы понятия не имеем, какой тип ошибки у вашей serialize_addr функции, поэтому мне пришлось кое-что выбрать:

use futures::Future;
use std::{io, net::SocketAddr};
use tokio::{io::write_all, net::TcpStream};

fn serialize_addr() -> Result<Vec<u8>, Box<dyn std::error::Error>> {
    Ok(vec![])
}

pub fn handle_client(addr: &SocketAddr) -> impl Future<Item = (), Error = io::Error> {
    TcpStream::connect(addr).and_then(|stream| {
        let addr = serialize_addr()?;
        write_all(stream, addr).then(|_result| Ok(()))
    })
}
error[E0277]: the `?` operator can only be used in a function that returns `Result` or `Option` (or another type that implements `std::ops::Try`)
  --> src/lib.rs:11:20
   |
11 |         let addr = serialize_addr()?;
   |                    ^^^^^^^^^^^^^^^^^ cannot use the `?` operator in a function that returns `futures::future::then::Then<tokio_io::io::write_all::WriteAll<tokio_tcp::stream::TcpStream, std::vec::Vec<u8>>, std::result::Result<(), std::io::Error>, [closure@src/lib.rs:12:38: 14:10]>`
   |
   = help: the trait `std::ops::Try` is not implemented for `futures::future::then::Then<tokio_io::io::write_all::WriteAll<tokio_tcp::stream::TcpStream, std::vec::Vec<u8>>, std::result::Result<(), std::io::Error>, [closure@src/lib.rs:12:38: 14:10]>`
   = note: required by `std::ops::Try::from_error`

Как говорится в сообщении об ошибке:

оператор ? может использоваться только в функции, которая возвращает Result или Option (или другой тип, реализующий std::ops::Try)

и

нельзя использовать оператор ? в функции, возвращающей Then<WriteAll<TcpStream, Vec<u8>>, Result<(), io::Error>, [closure]>

Вместо этого используйте тот факт, что Result можно рассматривать как будущее, и позвольте ему участвовать в цепочке функций.

Кроме того, как и везде в Rust, вам нужен единый тип ошибки. Я выбрал Box<dyn Error> для простоты. Этого можно добиться с помощью map_err и Into::into

use futures::Future;
use std::net::SocketAddr;
use tokio::{io::write_all, net::TcpStream};

type Error = Box<dyn std::error::Error>;

fn serialize_addr() -> Result<Vec<u8>, Error> {
    Ok(vec![])
}

pub fn handle_client(addr: &SocketAddr) -> impl Future<Item = (), Error = Error> {
    TcpStream::connect(addr)
        .map_err(Into::into)
        .and_then(|stream| serialize_addr().map(|addr| (stream, addr)))
        .and_then(|(stream, addr)| write_all(stream, addr).map_err(Into::into))
        .then(|_result| Ok(()))
}

В будущем синтаксис async / await упростит выполнение этого.

person Shepmaster    schedule 28.02.2019

Решение для двух потоков:

fn handle_client(addr: &SocketAddr) -> impl Future<Item = (), Error = Error> {
    TcpStream::connect(addr)
        .map_err(Into::into)
        .and_then(|remote_stream| {
            remote_stream
                .local_addr()
                .map(|remote_addr| (remote_stream, remote_addr))
        })
        .map_err(Into::into)
        .and_then(|(remote_stream, remote_addr)| {
            TcpStream::connect(&"".parse().unwrap())
                .map(move |proxy_stream| (remote_stream, proxy_stream, remote_addr))
        })
        .and_then(|(remote_stream, proxy_stream, remote_addr)| {
            serialize_addr(remote_addr)
                .map(|info| (remote_stream, proxy_stream, info))
                .map_err(|_| io::Error::from(io::ErrorKind::AddrNotAvailable))
        })
        .and_then(|(remote_stream, proxy_stream, info)| {
            write_all(proxy_stream, info).map(|proxy_stream| (remote_stream, proxy_stream.0))
        })
        .and_then(|(remote_stream, proxy_stream)| {
            // working with streams
        })
        .then(|_result| Ok(()))
}
person abritov    schedule 03.03.2019