проблема производитель-потребитель с pthreads

Я пытаюсь решить проблему производитель-потребитель, используя pthreads и семафоры, но похоже, что потоки-производители не производят, а потоки-потребители не потребляют. Похоже, что потоки создаются:

  /* Do actual work from this point forward */
  /* Create the producer threads */
  for(c1=1; c1<=argarray[1]; c1++)
  {
    pthread_create(&tid, &attr, producer, NULL);
    printf("Creating producer #%d\n", c1);    
  }

  /* Create the consumer threads */
  for(c1=1; c1<=argarray[2]; c1++)
  {
    pthread_create(&tid, &attr, consumer, NULL);
    printf("Creating consumer #%d\n", c1);    
  }

потому что «Создание производителя № x» и «Создание потребителя № x» выводятся на экран. Однако он не печатает изнутри самих потоков:

if(insert_item(item))
{
  fprintf(stderr, "Producer error.");
}
else
{
  printf("Producer produced %d\n", item);
}

аналогично для потребительских потоков. Полный код:

#include "buffer.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>

/* Create Buffer */
buffer_item buffer[BUFFER_SIZE];

/* Semaphore and Mutex lock */
sem_t cEmpty;
sem_t cFull;
pthread_mutex_t mutex;

/* Threads */
pthread_t tid; /* Thread ID */
pthread_attr_t attr; /* Thread attributes */

void *producer(void *param);
void *consumer(void *param);
void init();

/* Progress Counter */
int cg;

main(int argc, char *argv[])
{
  /* Variables */
  int argarray[3], c1;

  /* Argument counter checks */
  if(argc != 4)
  {
    fprintf(stderr, "usage: main [sleep time] [# of producer threads] [# of consumer threads]\n");
    return -1;
  }

  /* Get args from command line and change them into integers */
  argarray[0] = atoi(argv[1]); /* How long to sleep before ending */
  argarray[1] = atoi(argv[2]); /* Producer threads */
  argarray[2] = atoi(argv[3]); /* Consumer threads */

  /* Error check */
  if(argarray[1]<1)
  {
    fprintf(stderr, "argument 2 must be > 0\n");
    return -1;
  }
  if(argarray[2]<1)
  {
    fprintf(stderr, "argument 3 must be > 0\n");
    return -1;
  }    

  init();

  /* Do actual work from this point forward */
  /* Create the producer threads */
  for(c1=1; c1<=argarray[1]; c1++)
  {
    pthread_create(&tid, &attr, producer, NULL);
    printf("Creating producer #%d\n", c1);    
  }

  /* Create the consumer threads */
  for(c1=1; c1<=argarray[2]; c1++)
  {
    pthread_create(&tid, &attr, consumer, NULL);
    printf("Creating consumer #%d\n", c1);    
  }

  /* Ending it */
  sleep(argarray[0]);

  printf("Production complete.\n");
  exit(0);
}

void init()
{
  int c2;

  pthread_mutex_init(&mutex, NULL); /* Initialize mutex lock */
  pthread_attr_init(&attr); /* Initialize pthread attributes to default */
  sem_init(&cFull, 0, 0); /* Initialize full semaphore */
  sem_init(&cEmpty, 0, BUFFER_SIZE); /* Initialize empty semaphore */
  cg = 0; /* Initialize global counter */ 
  for(c2=0;c2<BUFFER_SIZE;c2++) /* Initialize buffer */
  {
    buffer[c2] = 0;
  }
}

void *producer(void *param)
{
  /* Variables */
  buffer_item item;

  while(1)
  { 
    sleep(rand());      
    item = (rand()); /* Generates random item */ 

    sem_wait(&cEmpty); /* Lock empty semaphore if not zero */
    pthread_mutex_lock(&mutex);

    if(insert_item(item))
    {
      fprintf(stderr, "Producer error."); 
    }
    else
    {
      printf("Producer produced %d\n", item); 
    }

    pthread_mutex_unlock(&mutex);
    sem_post(&cFull); /* Increment semaphore for # of full */
  }
}

void *consumer(void *param)
{
  buffer_item item;

  while(1)
  {
    sleep(rand());
    sem_wait(&cFull); /* Lock empty semaphore if not zero */
    pthread_mutex_lock(&mutex);
    if(remove_item(&item))
    {
      fprintf(stderr, "Consumer error."); 
    }
    else
    {
      printf("Consumer consumed %d\n", item);
    }

    pthread_mutex_unlock(&mutex);
    sem_post(&cEmpty); /* Increments semaphore for # of empty */
  }
}

int insert_item(buffer_item item)
{
  if(cg < BUFFER_SIZE) /* Buffer has space */
  {
    buffer[cg] = item;
    cg++;
    return 0;
  }
  else /* Buffer full */
  {
    return -1;
  }
}

int remove_item(buffer_item *item)
{
  if(cg > 0) /* Buffer has something in it */
  {
    *item = buffer[(cg-1)];
    cg--;
    return 0;
  }
  else /* Buffer empty */
  {
    return -1;
  }
}

Выход терминала:

user@isanacom:~/Desktop/PCthreads$ ./main 10 10 10
Creating producer #1
Creating producer #2
Creating producer #3
Creating producer #4
Creating producer #5
Creating producer #6
Creating producer #7
Creating producer #8
Creating producer #9
Creating producer #10
Creating consumer #1
Creating consumer #2
Creating consumer #3
Creating consumer #4
Creating consumer #5
Creating consumer #6
Creating consumer #7
Creating consumer #8
Creating consumer #9
Creating consumer #10
Production complete.

Как новичок в многопоточности, я уверен, что это, вероятно, что-то простое, что я упускаю из виду, и я ценю помощь.


person Mike    schedule 19.05.2011    source источник


Ответы (3)


И потребитель, и производитель выполняют sleep(rand()), который будет спать в течение случайного числа секунд от 0 до MAX_INT, в приведенном вами примере основной поток завершится через 10 секунд. Если значение rand() производителей выше 10, у них никогда не будет шанса что-либо произвести.

person Rickard    schedule 19.05.2011
comment
Хорошо поймал. На самом деле, хотя я думаю, что они уже вызвали UB, вызвав rand (который не является потокобезопасным) из более чем одного потока, возможно, одновременно. - person R.. GitHub STOP HELPING ICE; 19.05.2011
comment
Могу ли я использовать аргумент, который я передаю для сна, также для потоков, чтобы решить эту проблему? - person Mike; 19.05.2011
comment
Вы можете попробовать просто заменить sleep(rand()) на sleep(1) - person Rickard; 19.05.2011
comment
Могу ли я вызвать функцию соединения в конце основной функции в качестве решения этой проблемы? - person VaM999; 18.12.2019
comment
@VaM999 VaM999 Да, pthread_join() — это один из способов ожидания завершения потоком того, что он делает. Однако в приведенном выше примере ни потребители, ни производители никогда не завершают работу. - person Rickard; 18.12.2019

Вы должны начать с использования массива потоков...

pthread_t tid[argarray[1] + argarray[2]];

for(c1 = 0; c1 < argarray[1]; c1++)
{
    pthread_create(&tid[c1], &attr, producer, NULL);
    printf("Creating producer #%d\n", c1); 
}
for(c1 = 0; c1 < argarray[2]; c1++)
{
    pthread_create(&tid[c1 + argarray[1]], &attr, consumer, NULL);
    printf("Creating consumer #%d\n", c1); 
}

Могут быть и другие проблемы, но это первая, которую я вижу...

person malcolm    schedule 30.12.2012
comment
Если идентификатор потока не используется в приложении, нет смысла его сохранять. Эффективно сбрасывать его с помощью одного «&tid» во всех циклах создания — это нормально. - person Martin James; 06.03.2013
comment
Возможно даже, что вы можете передать NULL в качестве указателя pthread_t, но я не пробовал. - person Martin James; 06.03.2013

Вы не должны напрямую вызывать exit в конце вашей основной функции. Вы должны сначала вызвать pthread_join для потоков, которые вы создали в конце, чтобы поддерживать основной поток до тех пор, пока другие потоки не умрут.

person gunan    schedule 19.05.2011
comment
... или используйте другие средства предотвращения выхода из основного потока. Присоединение ко всем этим потокам означает сохранение ссылки на них по этой одной причине и цикла join() для них всех. Большая ПИТА. Легче просто дождаться ввода символа или использовать цикл sleep() или что-то в этом роде, чем беспорядочную штуку join(). - person Martin James; 06.03.2013