Начать несколько потоков с Tokio

Я пытаюсь создать базовый tcp-сервер:

  1. Сервер должен иметь возможность транслировать поток сообщений всем подключенным клиентам.
  2. Сервер должен иметь возможность получать команды от всех клиентов и обрабатывать их.

Вот что у меня есть в моей main функции:

let (server_tx, server_rx) = mpsc::unbounded();
let state = Arc::new(Mutex::new(Shared::new(server_tx)));

let addr = "127.0.0.1:6142".parse().unwrap();

let listener = TcpListener::bind(&addr).unwrap();

let server = listener.incoming().for_each(move |socket| {
    // Spawn a task to process the connection
    process(socket, state.clone());
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

println!("server running on localhost:6142");

let _messages = server_rx.for_each(|_| {
    // process messages here
    Ok(())
}).map_err(|err| {
    println!("message error = {:?}", err);
});

tokio::run(server);  

(площадка)

Я использую пример chat.rs из репозитория tokio в качестве основы.
Я отправляю данные на server_tx о входящих сообщениях TCP.
У меня проблемы с их потреблением.
Я «потребляю» поток входящих сообщений с использованием server_rx.for_each(|_| {, как мне сказать tokio запустить его?

tokio::run принимает одно будущее, но у меня их два (а возможно, и больше). Как их объединить, чтобы они работали параллельно?


person Leonti    schedule 29.10.2018    source источник


Ответы (1)


Объединяет будущее вместе:

let messages = server_rx.for_each(|_| {
    println!("Message broadcasted");
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

tokio::run(server.join(messages).map(|_| ()));

Комбинатор map() необходим, потому что Join Item связанный тип является кортежем ((), ()) и tokio::run() потребляют будущую задачу, для которой требуется Future::Item типа ()

person attdona    schedule 29.10.2018