Как я мог хранить замыкания и использовать их с актерами Actix?

Я пытаюсь использовать Actix для передачи событий захвата через WebSockets и их обработки, используя что-то вроде https://github.com/foochi/how-store-closures-with-actix. Идея заключается в предоставлении библиотеки, которую можно использовать для хранения закрытий (событий) и их запуска при получении текстового сообщения WebSockets.

use actix::*;
use actix_web::ws::{Client, Message, ProtocolError};
use futures::Future;

struct MyActor {
    handler: Box<Fn(String) + 'static>,
}

impl Actor for MyActor {
    type Context = Context<Self>;
}

impl StreamHandler<Message, ProtocolError> for MyActor {
    fn handle(&mut self, msg: Message, _ctx: &mut Context<Self>) {
        match msg {
            Message::Text(text) => {
                (self.handler)(text)
            },
            _ => panic!(),
        }
    }
}

pub struct Event {
    handler: Box<Fn(String) + 'static>,
}

pub struct EventManager {
    events: Vec<Event>,
}

impl EventManager {

    pub fn new() -> Self {
        Self { events: vec![] }
    }

    pub fn capture<F>(&mut self, function: F)
    where
        F: for<'h> Fn(String) + 'static
    {
        let event = Event { handler: Box::new(function), };
        self.events.push(event);
    }

    pub fn run(&self) {
        let runner = System::new("example");
        let event = &self.events[0];

        Arbiter::spawn(
            Client::new("example")
                .connect()
                .map(|(reader, _writer)| {
                    MyActor::create(|ctx| {
                        MyActor::add_stream(reader, ctx);
                        MyActor { handler: event.handler }
                    });
                })
                .map_err(|err| {})
        );

        runner.run();
    }
}

Моя проблема в том, что у меня такая ошибка:

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
  --> src/events.rs:48:22
   |
48 |         let event = &self.events[0];
   |                      ^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 46:5...
  --> src/events.rs:46:5
   |
46 | /     pub fn run(&self) {
47 | |         let runner = System::new("example");
48 | |         let event = &self.events[0];
49 | |
...  |
62 | |         runner.run();
63 | |     }
   | |_____^
note: ...so that reference does not outlive borrowed content
  --> src/events.rs:48:22
   |
48 |         let event = &self.events[0];
   |                      ^^^^^^^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/events.rs:54:37: 57:22 reader:actix_web::ws::ClientReader, event:&&events::Event]` will meet its required lifetime bounds
  --> src/events.rs:54:21
   |
54 |                     MyActor::create(|ctx| {
   |                     ^^^^^^^^^^^^^^^

Я думаю, что частично понимаю основную причину: я пытаюсь передать ссылку (событие) на StreamHandler, но время жизни не совпадает.

Как я мог это исправить?


person Deveres    schedule 09.10.2018    source источник


Ответы (1)


Отказ от ответственности: я не могу комментировать, подходит ли это шаблон дизайна для Actix, поскольку я только начинаю понимать каркас.

Как вы уже заметили, проблема связана с требованиями к сроку службы.

Метод Actor::create требует 'static времени жизни для аргумент закрытия:

fn create<F>(f: F) -> Addr<Self> 
where
    Self: Actor<Context = Context<Self>>,
    F: FnOnce(&mut Context<Self>) -> Self + 'static, 

&self.events[0] не удовлетворяет требованиям 'static срока службы.

Одно из решений - передать владение объектом EventManager MyActor:

use actix::*;
use actix_web::ws::{Client, Message, ProtocolError};
use futures::Future;

struct MyActor {
    evm: EventManager,
}

impl Actor for MyActor {
    type Context = Context<Self>;
}

impl StreamHandler<Message, ProtocolError> for MyActor {
    fn handle(&mut self, msg: Message, _ctx: &mut Context<Self>) {
        match msg {
            Message::Text(text) => {
                // just for sake of demo: execute all event handlers
                for idx in 0..self.evm.events.len() {
                    (self.evm.events[idx].handler)(text.clone())
                }
            }
            _ => panic!(),
        }
    }
}

pub struct Event {
    handler: Box<Fn(String) + 'static>,
}

pub struct EventManager {
    events: Vec<Event>,
}

impl EventManager {
    pub fn new() -> Self {
        Self { events: vec![] }
    }

    pub fn capture<F>(&mut self, function: F)
    where
        F: Fn(String) + 'static,
    {
        let event = Event {
            handler: Box::new(function),
        };
        self.events.push(event);
    }

    pub fn run(self) {
        let runner = System::new("example");

        Arbiter::spawn(
            Client::new("http://127.0.0.1:8080/ws/")
                .connect()
                .map(|(reader, _writer)| {
                    MyActor::create(|ctx| {
                        MyActor::add_stream(reader, ctx);

                        // move event manager inside the actor
                        MyActor { evm: self }
                    });
                }).map_err(|err| println!("FATAL: {}", err)),
        );

        runner.run();
    }
}

pub fn ready() {
    let mut client = EventManager::new();

    client.capture(|data| println!("processing the data: {:?}", data));
    client.capture(|data| println!("processing AGAIN the data: {:?}", data));
    client.run();

    // here run client is not more available: it was moved inside the actor
}

fn main() {
    ready();
}
person attdona    schedule 10.10.2018
comment
Спасибо @attdona! Не только ваш ответ решает мою проблему, но теперь я научился по-другому рассуждать о жизнях и владении. - person Deveres; 13.10.2018