Есть ли способ создать генератор асинхронного потока, который дает результат многократного вызова функции?

Я хочу создать программу, которая собирает обновления погоды и представляет их в виде потока. Я хочу вызвать get_weather() в бесконечном цикле с задержкой в ​​60 секунд между завершением и началом.

Упрощенная версия будет выглядеть так:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}

Есть ли способ сделать это легко?

Использование tokio::timer::Interval не будет работать, если get_weather() занимает более 60 секунд:

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}

Если это произойдет, немедленно запустится следующая функция. Я хочу, чтобы между предыдущим get_weather() запуском и следующим get_weather() запуском оставалось ровно 60 секунд.


person peku33    schedule 04.11.2019    source источник


Ответы (1)


Используйте stream::unfold, чтобы выйти из "мира будущего" «в« мир потоков ». Нам не нужно никакого дополнительного состояния, поэтому мы используем пустой кортеж:

use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((), |_| async {
        time::delay_for(BETWEEN).await;
        let weather = get_weather().await;
        Some((weather, ()))
    })
}

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| async {
            println!("Got the weather");
        })
        .await;
}
% time ./target/debug/example

Got the weather
Got the weather
Got the weather

real    3.085   3085495us
user    0.004   3928us
sys     0.003   3151us

Смотрите также:

person Shepmaster    schedule 04.11.2019
comment
Есть ли способ разрешить передачу переменной в get_weather() по ссылке? Я хочу передать его get_weather_stream(), а затем get_weather() на каждой итерации. Конечно, время жизни результирующего Stream должно зависеть от этой переменной. - person peku33; 05.11.2019
comment
@ peku33 Я не сразу понимаю, почему это не сработает. Возможно, вам стоит попробовать и доложить! - person Shepmaster; 05.11.2019
comment
На первом этапе я добавил <'a>(arg: &'a i32) к обеим функциям. Я также добавил + 'a к get_weather_stream возврату. Однако я получаю closure may outlive current function. Я бы не хотел использовать Arc, Rc и т. Д., Если это возможно. - person peku33; 06.11.2019
comment
@ peku33 похоже, работает - person Shepmaster; 07.11.2019