Как я могу дождаться завершения неизвестного количества потоков Rust без использования дескрипторов отслеживания?

Какие есть хорошие способы адаптировать этот Barrier пример для обработки двух различий:

  1. количество элементов заранее неизвестно (например, в случае разбиения большого файла на строки)

  2. без отслеживания дескрипторов потока (например, без использования вектора handles в приведенном ниже примере). Мотивация заключается в том, что это добавляет дополнительные накладные расходы.

Пример кода:

use std::sync::{Arc, Barrier};
use std::thread;

let mut handles = Vec::with_capacity(10);
let barrier = Arc::new(Barrier::new(10));
for _ in 0..10 {
    let c = barrier.clone();
    handles.push(thread::spawn(move|| {
        // do some work
        c.wait();
    }));
}
// Wait for other threads to finish.
for handle in handles {
    handle.join().unwrap();
}

Фрагмент кода немного адаптирован из Barrier docs.

Первое, что пришло мне в голову, это (если возможно) изменить внутреннее значение Barrier; однако API не предоставляет изменяемый доступ к свойству num_threads структуры Barrier.

Другая идея заключалась бы в том, чтобы не использовать Barrier и вместо этого писать пользовательскую логику с AtomicUsize.

Я открыт для изучения самых эргономичных/идиоматических способов сделать это в Rust.


person David J.    schedule 05.07.2017    source источник
comment
Хотите спросить, как синхронизировать потоки, не отслеживая механизмы синхронизации потоков? Вы должны где-то хранить эти ручки, если хотите присоединиться к ним позже. Поскольку размеры векторов могут изменяться, что плохого в создании и сохранении такого переменного количества дескрипторов и экземпляров барьера?   -  person E_net4 the curator    schedule 05.07.2017


Ответы (1)


Вы можете использовать спин-блокировку на atomic для ожидания выхода всех потоков. Конечно, вместо использования static atomic вы можете передавать Arc<AtomicUsize> в каждый поток.

Ordering::SeqCst, вероятно, слишком сильный, но параллельное программирование сложно, и я не уверен, как можно смягчить этот порядок.

Хотя это можно сделать таким образом, стоимость создания потоков, вероятно, превзойдет такую ​​микрооптимизацию. Также стоит учитывать, что занятое ожидание может снизить производительность программы.

use std::panic;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::thread;
use std::time::Duration;

static GLOBAL_THREAD_COUNT: AtomicUsize = ATOMIC_USIZE_INIT;

fn main() {
    for i in 0..10 {
        // mark that the thread is about to run
        // we need to do it in the main thread to prevent spurious exits
        GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::SeqCst);
        thread::spawn(move|| {
            // We need to catch panics to reliably signal exit of a thread
            let result = panic::catch_unwind(move || {
                // do some work
                println!("{}-th thread reporting", i+1);
            });
            // process errors
            match result {
                _ => {}
            }
            // signal thread exit
            GLOBAL_THREAD_COUNT.fetch_sub(1, Ordering::SeqCst);
        });
    }
    // Wait for other threads to finish.
    while GLOBAL_THREAD_COUNT.load(Ordering::SeqCst) != 0 {
        thread::sleep(Duration::from_millis(1)); 
    }
}

ссылка на игровую площадку

person red75prime    schedule 05.07.2017