Почему использование клонированного гипер-клиента с Tokio Futures в блоке цикла даже после разрешения будущего?

У меня есть служба, которая обновляет данные кеша с фиксированным интервалом. Каждые N секунд он будет запускать будущее с использованием цикла (tokio::run(future_update(http_client.clone()))), но он не возвращается в родительскую функцию, в которой было разрешено будущее. Цикл блокируется, и я получаю только одну итерацию.

Когда я создаю новый гипер-HTTP-клиент вместо того, чтобы передавать клонированный, все работает правильно. Не работает Arc<Client> тоже.

pub fn trigger_cache_reload(http_client: Arc<Client<HttpConnector, Body>>) {
    let load_interval_sec = get_load_interval_sec(conf.load_interval_seconds.clone());

    std::thread::spawn(move || loop {
        let http_client = http_client.clone();

        info!("Woke up");
        tokio::run(pipeline(http_client));
        info!(
            "Pipeline run complete. Huuhh Now I need sleep of {} secs. Sleeping",
            load_interval_sec
        );
        std::thread::sleep(std::time::Duration::from_secs(load_interval_sec));
    });
}

fn pipeline(
    client: Arc<Client<HttpConnector, Body>>,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
    let res = fetch_message_payload() //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
        .map_err(Error::from)
        .and_then(|_| {
            //let client = hyper::Client::builder().max_idle_per_host(1).build_http();
            //if i create new client here every time and use it then all working is fine.
            refresh_cache(client) //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
                .map_err(Error::from)
                .and_then(|arg| {
                    debug!("refresh_cache completed");
                    Ok(arg)
                })
        });

    let res = res.or_else(|e| {
        error!("error {:?}", e);
        Ok(())
    });
    Box::new(res)
}

После однократного вызова trigger_cache_reload я получаю сообщение журнала "woke up". Я также получаю "refresh_cache completed" сообщение журнала через некоторое время при успешном завершении future. Я не получаю "sleeping" сообщение журнала с Arc или без него.

Если я каждый раз создаю нового клиента внутри будущего, я могу получать "sleeping" сообщений журнала.


person Jarvis42    schedule 21.08.2019    source источник


Ответы (1)


tokio::run создает совершенно новый цикл событий и пул потоков (реактор + исполнитель) каждый раз, когда вы его вызываете. На самом деле это не то, что вы хотите делать.

Гипер-клиент привяжет свое состояние к предыдущему циклу событий и не сможет добиться прогресса при опросе в новом, поскольку старый цикл событий будет уничтожен после завершения run. Вот почему новый клиент работает, но вы не можете повторно использовать старый.

Здесь есть два решения:

  • Если остальная часть вашего приложения не использует tokio, я бы просто использовал синхронный reqwest :: Клиент. Если вам не нужен большой параллелизм, синхронное решение здесь намного проще.

  • если вы используете tokio, используйте tokio :: spawn внутри другого будущего вместе с tokio_timer: : Тайм-аут для запуска проверок и затем ожидания в течение указанного времени в цикле событий.

Пример async / await

Новая поддержка async / await значительно упрощает написание такого кода.

Этот пример в настоящее время работает только с nightly компилятором с tokio-0.3.0-alpha.2 и текущей hyper веткой master:

[dependencies]
tokio = "0.3.0-alpha.2"
tokio-timer = "0.3.0-alpha.2"
hyper = { git = "https://github.com/hyperium/hyper.git" }
use tokio::timer::Interval;
use hyper::{Client, Uri};

use std::time::Duration;

#[tokio::main]
async fn main() {
    let client = Client::new();
    let second_interval = 120;
    let mut interval = Interval::new_interval(Duration::from_secs(second_interval));
    let uri = Uri::from_static("http://httpbin.org/ip");

    loop {
        let res = Client.get(uri.clone()).await.unwrap();
        // Do what you need to with the response...
        interval.next().await;
    }
}
person theduke    schedule 22.08.2019