Создание задач с нестатическим временем жизни с помощью tokio 0.1.x

У меня есть ядро ​​tokio, основная задача которого - запуск веб-сокета (клиента). Когда я получаю сообщения от сервера, я хочу выполнить новую задачу, которая обновит некоторые данные. Ниже приведен минимальный пример неудачи:

use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;

struct Client {
    handle: Handle,
    data: usize,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        self.handle.spawn(future::ok(()).and_then(|x| {
            self.data += 1; // error here
            future::ok(())
        }));
    }
}

fn main() {
    let mut runtime = Core::new().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: 0,
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.run(task).unwrap();
}

Что вызывает эту ошибку:

error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
  --> src/main.rs:13:21                                                                                                                                                                
   |                                                                                                                                                                                   
13 |         self.handle.spawn(future::ok(()).and_then(|x| {                                                                                                                           
   |                     ^^^^^                                                                                                                                                         
   |                                                                                                                                                                                   
   = note: type must satisfy the static lifetime      

Проблема в том, что новые задачи, порождаемые дескриптором, должны быть статическими. Та же проблема описана здесь. К сожалению, мне неясно, как я могу решить эту проблему. Даже несколько попыток с and Arc и Mutex (которые действительно не должны быть нужны для однопоточного приложения), я не увенчался успехом.

Поскольку разработки в среде токио происходят довольно быстро, мне интересно, какое сейчас лучшее решение. Есть ли у вас какие-либо предложения?

изменить

Решение от Питера Холла работает для приведенного выше примера. К сожалению, когда я построил неудачный пример, я изменил реактор tokio, думая, что они будут похожи. Использование tokio::runtime::current_thread

use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        let mut data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

fn main() {
    // let mut runtime = Core::new().unwrap();

    let mut runtime = Builder::new().build().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: Rc::new(Cell::new(1)),
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.block_on(task).unwrap();
}

Я получаю:

error[E0277]: `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
--> src/main.rs:17:21                                                         
|                                                                            
17 |         self.handle.spawn(future::ok(()).and_then(move |_x| {              
|                     ^^^^^ `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
|                                                                            
= help: within `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]`
= note: required because it appears within the type `futures::future::chain::Chain<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
= note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`

Так что, похоже, в этом случае мне нужны Arc и Mutex, хотя весь код однопоточный?


person Ben Ruijl    schedule 20.12.2018    source источник
comment
Пожалуйста, не обновите свой вопрос, на который вы уже ответили, чтобы задать новые вопросы. Вместо этого найдите время, чтобы создать улучшенный минимальный воспроизводимый пример, задайте новый вопрос, объясните, чем эти два вопроса различаются , и, возможно, связь между ними.   -  person Shepmaster    schedule 21.12.2018


Ответы (1)


В однопоточной программе вам не нужно использовать Arc; Rc достаточно:

use std::{rc::Rc, cell::Cell};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

Дело в том, что вам больше не нужно беспокоиться о времени жизни, потому что каждый клон Rc действует так, как будто он владеет данными, а не обращается к ним через ссылку на self. Внутренний Cell (или RefCell для типов, отличных от Copy) необходим, потому что Rc не может быть изменен по разыменованию, поскольку он был клонирован.


spawn метод tokio::runtime::current_thread::Handle требует, чтобы будущее было Send, что и является причиной проблемы в обновлении вашего вопроса. В этой проблеме Tokio Github < / а>.

Вы можете использовать tokio::runtime::current_thread::spawn вместо метода Handle, который всегда будет запускать future в текущем потоке и не требует, чтобы future было Send. Вы можете заменить self.handle.spawn в приведенном выше коде, и он будет работать нормально.

Если вам нужно использовать метод на Handle, вам также нужно будет прибегнуть к Arc и Mutex (или _ 20_), чтобы удовлетворить Send требование:

use std::sync::{Mutex, Arc};

struct Client {
    handle: Handle,
    data: Arc<Mutex<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Arc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            *data.lock().unwrap() += 1;
            future::ok(())
        }));
    }
}

Если ваши данные действительно usize, вы также можете использовать AtomicUsize вместо Mutex<usize>, но я лично считаю его столь же громоздким для работы.

person Peter Hall    schedule 20.12.2018