Синхронизировать потоки для вызова pthread_cond_broadcast

У меня есть простое приложение с потоком «менеджер», который порождает десять простых «рабочих» потоков. Я хочу, чтобы все «рабочие» потоки блокировались одной и той же условной переменной (например, condvar), и я хочу вручную сигнализировать всем десяти потокам о пробуждении в одно и то же время с помощью вызова pthread_cond_broadcast.

В случае с моим приложением потоки могут столкнуться с ошибкой и завершиться досрочно, поэтому возможно, что не все десять потоков дойдут до точки синхронизации.

Одним из простых механизмов может быть создание pthread_barrier_t и вызов pthread_barrier_wait всеми десятью потоками, и когда все десять потоков завершат этот вызов, все они смогут продолжить выполнение. Однако для этого потребуется, чтобы потоки могли изменять количество потоков, требуемых барьером для разблокировки. Я не знаю, можно ли это безопасно изменить.

Кроме того, я хочу гарантировать, что все еще работающие потоки не запускаются автоматически, как в случае с барьером, вместо этого я хочу запускать их вручную с помощью вызова pthread_cond_broadcast. Как я могу гарантировать, что все живые потоки (в идеале десять) заблокируются в condvar до того, как я сделаю широковещательный вызов?

Спасибо!


person Cloud    schedule 17.01.2012    source источник
comment
Я не очень понимаю вашу проблему. Чтобы разбудить всех, кто ждет переменную cond, вы просто используете pthread_cond_broacast. Это разбудит всех, кто еще там. Вы хотите убедиться, что все действительно ждут переменную cond, или в чем ваша проблема?   -  person Jens Gustedt    schedule 18.01.2012
comment
Правильно, я хочу, чтобы все ждали. Проблема в том, что некоторые, возможно, уже вымерли, поэтому я хочу, чтобы все те, кто еще жив, были подожжены.   -  person Cloud    schedule 18.01.2012


Ответы (2)


Ниже показан один из способов сделать это, используя условную переменную и несколько других переменных; хотя могут быть и лучшие способы. Комментарии должны показать, как это работает. Конечно, вам придется изменить вещи, чтобы они соответствовали вашей реальной ситуации; например, могут быть задействованы петли и т. д.

int activeThreads = 0;   /* number of threads currently going */
int waitingThreads = 0;  /* number of threads waiting on the condition */
int readyFlag = 0;       /* flag to tell the threads to proceed when signaled */
pthread_cond_t  cond;    /* condition to wait on / signal */
pthread_mutex_t mtx;     /* mutex for the above */

pthread_cond_t  condWaiting; /* EDIT: additional condition variable to signal 
                              * when each thread starts waiting */

void *threadFunc(void *arg)
{
  /* Edit: Rather than incrementing 'activeThreads' here, it should be done
   * in the main thread when each thread is created (to avoid a race) */

  /* ...do stuff... */

  /* When the threads should wait, do this (they wait for 'readyFlag' to be 
   * set, but also adjust the waiting thread count so the main thread can
   * determine whether to broadcast) */
  pthread_mutex_lock(&mtx);
    if (readyFlag == 0) {
      waitingThreads++;
      do {
        pthread_cond_signal(&condWaiting); /* EDIT: signal the main thread when
                                            * a thread begins waiting */
        pthread_cond_wait(&cond,&mtx);
      } while (readyFlag == 0);
      waitingThreads--;
    }
  pthread_mutex_unlock(&mtx);

  /* ...more stuff... */

  /* When threads terminate, they decrement the active thread count */
  pthread_mutex_lock(&mtx);
    activeThreads--;
    pthread_cond_signal(&condWaiting); /* EDIT: also signal the main thread
                                        * when a thread exits to make it 
                                        * recheck the waiting thread count if
                                        * waiting for all threads to wait */
  pthread_mutex_unlock(&mtx);

  return NULL;
}

int main(int argc, char *argv[])
{
  /* Edit: Showing some code to initialize the mutex, condition variable(s),
   * etc., and create some threads -- modify as needed */
  pthread_mutex_init(&mtx,NULL);
  pthread_cond_init(&cond,NULL);
  pthread_cond_init(&condWaiting,NULL); /* EDIT: if main thread should block
                                         * until all threads are waiting */
  activeThreads = waitingThreads = readyFlag = 0;

  /* Edit: Increment 'activeThreads' here rather than in the thread function,
   * to avoid a race (if the main thread started waiting for the others
   * when not all had incremented the count yet, the main thread might end
   * up waiting for fewer threads to be ready -- though it's unlikely */
  #define NUM_THREADS 10
  pthread_t workers[NUM_THREADS];
  for (int i = 0; i < NUM_THREADS; i++) {
    /* should use appropriate thread attributes */
    if (pthread_create(&workers[i],NULL,threadFunc,NULL) == 0)
      activeThreads++;
  }

  /* ...do stuff... */

  /* Set 'readyFlag' and do condition broadcast IF all threads are waiting,
   * or just carry on if they aren't */
  pthread_mutex_lock(&mtx);
    if ((activeThreads != 0) && (activeThreads == waitingThreads)) {
      readyFlag = 1;
      pthread_cond_broadcast(&cond);
    }
  pthread_mutex_unlock(&mtx);

  /* EDIT: OR.. to wait until all threads are waiting and then broadcast, do 
   * this instead: */
  pthread_mutex_lock(&mtx);
    while (waitingThreads < activeThreads) { /* wait on 'condWaiting' until all
                                              * active threads are waiting */
      pthread_cond_wait(&condWaiting,&mtx);
    }
    if (waitingThreads != 0) {
      readyFlag = 1;
      pthread_cond_broadcast(&cond);
    }
  pthread_mutex_unlock(&mtx);

  /* ...more stuff... */

  /* If needed, you can clear the flag when NO threads are waiting.. */
  pthread_mutex_lock(&mtx);
    if (waitingThreads == 0)
      readyFlag = 0;
  pthread_mutex_unlock(&mtx);

  /* ...even more stuff... */

  return 0;
}

Я бы добавил, однако, что не вижу веских причин делать это, а не просто защищать ресурсы более простым способом.

РЕДАКТИРОВАТЬ: добавлены некоторые вещи в код, показывающие вторую переменную условия, используемую для того, чтобы основной поток ждал, пока все рабочие будут готовы. Измененные части отмечены в комментариях «EDIT:» и могут быть опущены, если они не нужны. Я также исправил состояние гонки, переместив приращение activeThreads из функции потока, и показал инициализацию мьютекса и т. д. (без обработки ошибок).

person Dmitri    schedule 17.01.2012
comment
Я думаю, вам действительно нужна вторая переменная условия, чтобы поток main() (или «главный») ждал готовности рабочих потоков. В противном случае, если рабочие потоки не готовы, когда main() доходит до того места, где он установил бы readyFlag = 1, то это условие никогда не будет сигнализировано, и рабочие потоки никогда не будут продолжаться. - person Michael Burr; 18.01.2012
comment
@MichaelBurr Я думал об этом, но нужно ли это, зависит от того, должен ли main() ждать или просто продолжать другую работу и периодически проводить опросы. Я мог бы отредактировать его в... - person Dmitri; 18.01.2012

Вообще говоря, вы должны просто установить переменную условия (и связанный с ней флаг), когда работа будет готова к работе — обычно нет необходимости ждать, пока потоки заблокируются в переменной условия. Если они «опаздывают», они просто заметят, что флаг уже установлен, и не будут блокировать.

Но если вам действительно нужно дождаться, пока все рабочие потоки не окажутся в точке, где они блокируются по условию var, вы можете использовать комбинацию условных переменных — одну, которая отслеживает, сколько потоков «готово к работе», а другую заставить их начать работу. Какой-то пидо-код:

// manager thread thread

pthread_cond_t pseudo_barrier;
pthread_cond_t pseudo_barrier_complete_cond;
pthread_mutex_t pseudo_barrier_mux;
int pseudo_barrier_counter = NUM_THREADS;
int pseudo_barrier_complete_flag = 0;

void thread_manager(void) 
{
    pthread_cond_init( &pseudo_barrier, NULL);
    pthread_cond_init( &pseudo_barrier_complete_cond, NULL);
    pthread_mutex_init( &pseudo_barrier_mux, NULL);


    for (int i = 0 ; i < NUM_THREADS; ++i) {
        pthread_create( /*... */);
    }

    // wait for threads to 'stage'
    pthread_mutex_lock( &pseudo_barrier_mux);
    while (pseudo_barrier_counter != 0) {
        pthread_cond_wait( &pseudo_barrier, &pseudo_barrier_mux);
    }
    pthread_mutex_unlock( &pseudo_barrier_mux);


    // at this point, all threads have either bailed out or are waiting to go
    // let 'em rip

    pthread_mutex_lock( &pseudo_barrier_mux);
    pseudo_barrier_complete_flag = 1;
    pthread_mutex_unlock( &pseudo_barrier_mux);
    pthread_cond_broadcast( &pseudo_barrier_complete_cond);

    // do whatever else the manager thread needs to do...
}


// worker threads
void* worker_thread(void* context)
{
    int error_result = 0;

    // whatever initialization...
    //  if this thread is going to bail out due to an error, it needs to 
    //  set the `error_result` value appropriately and still drop into the 
    //  following code

    // let the manager know that this thread is waiting (or isn't going to participate)
    pthread_mutex_lock( &pseudo_barrier_mux);
    --pseudo_barrier_counter;

    if (pseudo_barrier_counter == 0) {
        // all other threads are accounted for, let the manager know we're ready
        pthread_cond_signal( &pseudo_barrier);
    }

    // if this thread isn't going to contine because of some error, it's already 
    //  accounted for that fact in the `my_barrier_count`, so we can return here
    //  without preventing the pseudo-barrier from being met.
    if (some_error_occurred) {
        pthread_mutex_lock( &pseudo_barrier_mux);
        return NULL;
    }

    // NOTE: we're still holding pseudo_barrier_mux, so the master thread is still 
    //  blocked, even if we've signaled it - it'll jhave to wait until this 
    //  thread is blocking on `pseudo_barrier_complete_cond`

    while (!pseudo_barrier_complete_flag) {
        pthread_cond_wait( &pseudo_barrier_complete_cond, &pseudo_barrier_mux);
    }
    pthread_mutex_unlock( &pseudo_barrier_mux);


    // do the work...
}

Конечно, представленный псевдокод следует очистить для любого реального использования (включая обработку ошибок), возможно, упаковав все поддерживающие условные переменные, мьютекс и флаги в структуру.

person Michael Burr    schedule 17.01.2012