Ручной опрос потоков в будущей реализации

Я сейчас перехожу на futures 0.3 и tokio 0.2, и есть один повторяющийся шаблон, который я не могу использовать повторно. Я не уверен, устарел ли этот шаблон или я что-то делаю не так с Pin.

Обычно у меня есть один тип с розеткой и несколько канальных приемников. Реализация Future для таких структур состоит в многократном опросе потоков до тех пор, пока они не вернут Pending (NotReady в экосистеме 0.1).

Однако во фьючерсах 0.3 Future::poll и Stream::poll_next принимают self вместо &mut self, и этот шаблон больше не работает:

use futures::{
    stream::Stream,
    task::{Context, Poll},
    Future,
};
use std::pin::Pin;
use tokio::sync::mpsc::{Receiver, Sender};

/// Dummy structure that represent some state we update when we
/// receive data or events.
struct State;

impl State {
    fn update(&mut self, _data: Vec<u8>) {
        println!("updated state");
    }
    fn handle_event(&mut self, _event: u32) {
        println!("handled event");
    }
}

/// The future I want to implement.
struct MyFuture {
    state: State,
    data: Receiver<Vec<u8>>,
    events: Receiver<Vec<u8>>,
}

impl MyFuture {
    fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        let MyFuture {
            ref mut data,
            ref mut state,
            ..
        } = self.get_mut();

        loop {
            // this breaks, because Pin::new consume the mutable
            // reference on the first iteration of the loop.
            match Pin::new(data).poll_next(cx) {
                Ready(Some(vec)) => state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }

    // unimplemented, but we basically have the same problem than with
    // `poll_data()`
    fn poll_events(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        unimplemented!()
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        if let Ready(_) = self.poll_data(cx) {
            return Ready(());
        }

        // This does not work because self was consumed when
        // self.poll_data() was called.
        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}

Есть ли способ исправить этот код? Если нет, то какой шаблон я мог бы использовать для реализации той же логики?


person little-dude    schedule 17.08.2019    source источник
comment
используйте pinned.as_mut(), чтобы не использовать закрепленное значение   -  person Gurwinder Singh    schedule 17.08.2019


Ответы (1)


Вы можете использовать Pin::as_mut, чтобы избежать потребления Pin.

impl MyFuture {
    fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        let MyFuture {
            ref mut data,
            ref mut state,
            ..
        } = self.get_mut();

        let mut data = Pin::new(data); // Move pin here
        loop {
            match data.as_mut().poll_next(cx) {   // Use in loop by calling `as_mut()`
                Ready(Some(vec)) => state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }
}

и в будущем подразумевает:

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        // `as_mut()` here to avoid consuming
        if let Ready(_) = self.as_mut().poll_data(cx) { 
            return Ready(());
        }

        // can consume here as this is the last invocation
        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}

РЕДАКТИРОВАТЬ:

Совет: старайтесь использовать Pin только при необходимости. В вашем случае вам действительно не нужен закрепленный указатель в функции poll_data. &mut self в порядке, что немного снижает Pin использование:

impl MyFuture {
    fn poll_data(&mut self, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        loop {
            match Pin::new(&mut self.data).poll_next(cx) {
                Ready(Some(vec)) => self.state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }
}

и будущее подразумевает:

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        if let Ready(_) = self.poll_data(cx) {
            return Ready(());
        }

        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}
person Gurwinder Singh    schedule 17.08.2019
comment
Аллилуиа! Теперь, когда я это вижу, это кажется очевидным, но я пробовал много комбинаций с as_mut() и get_mut() раньше, но мне не удалось заставить его работать. - person little-dude; 17.08.2019