многопоточный потребитель, код производителя C, не работает?

Я пытаюсь создать простой многопоточный потребитель/производитель, в котором несколько потоков чтения и записи читают из файла в буфер, а затем из буфера обратно в файл. Он должен быть потокобезопасным. однако он работает не так, как я ожидал. Он останавливается на полпути, но каждый раз на другой линии? Пожалуйста, помогите мне понять, что я делаю неправильно?!?

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
//TODO Define global data structures to be used
#define BUF_SIZE 5
FILE *fr;
FILE *to;            /* declare the file pointer */

struct _data {
    pthread_mutex_t mutex;
    pthread_cond_t cond_read;
    pthread_cond_t cond_write;
    int condition;
    char buffer[BUF_SIZE];
    int datainbuffer;
}dc1 = {
    PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER,PTHREAD_COND_INITIALIZER,0,{0},0
};


void *reader_thread(void *arg) {
    //TODO: Define set-up required
    struct _data *d = (struct _data *)arg;
    int killreaders = 0;
    while(1) {
        //TODO: Define data extraction (queue) and processing

        pthread_mutex_lock(&d->mutex);

        while (d->condition == 0 || d->datainbuffer<=0){
            pthread_cond_wait( &d->cond_read, &d->mutex );
            if(killreaders == 1){
                pthread_mutex_unlock(&d->mutex);
                pthread_cond_signal(&d->cond_read);
                pthread_cond_signal(&d->cond_write);
                return NULL;
            }

        }
        d->condition = 0;

        int i;
        char res;
        //if the buffer is not full, that means the end of file is reached and it time to kill the threads remaining.
        if(d->datainbuffer!=BUF_SIZE)
            killreaders = 1;

        for (i=0; i<(sizeof d->datainbuffer); i++) {
            res = d->buffer[i];
            printf("to file:%c",res);
            fputc(res, to);
        }
        d->datainbuffer = 0;


        pthread_mutex_unlock(&d->mutex);
        pthread_cond_signal( &d->cond_write );


    }

    return NULL;
}

void *writer_thread(void *arg) {
    //TODO: Define set-up required
    struct _data *d = (struct _data *)arg;
    char * pChar;
    int killwriters = 0;

    while(1){
        pthread_mutex_lock(&d->mutex);
        while( d->condition == 1 || d->datainbuffer>0){
            pthread_cond_wait( &d->cond_write, &d->mutex );
            if(killwriters==1){
                pthread_mutex_unlock(&d->mutex);
                pthread_cond_signal(&d->cond_write);
                pthread_cond_signal(&d->cond_read);
                return NULL;

            }
        }
        d->condition = 1;
        int i;
        char rc;
        for (i = 0; i < BUF_SIZE; i++){
            if((rc = getc(fr)) == EOF){
                killwriters = 1;
                pthread_mutex_unlock(&d->mutex);
                pthread_cond_signal(&d->cond_read);

                return NULL;
            }
            d->datainbuffer = i+1;
            d->buffer[i] = rc;
            printf("%c",rc);
        }

        int m = 0;

        pthread_mutex_unlock(&d->mutex);
        pthread_cond_signal(&d->cond_read);


    }


    return NULL;
}


#define M 10
#define N 20
int main(int argc, char **argv) {
    struct _data dc=dc1;

    fr = fopen ("from.txt", "rt");  /* open the file for reading */
    if (fr == NULL)
    {
        printf("Could not open file!");
        return 1;
    }
    to = fopen("to.txt", "wt");


    int i;
    pthread_t readers[N];
    pthread_t writers[M];


    for(i = 0; i < N; i++) { 
        pthread_create(&readers[i], NULL, reader_thread, (void*)&dc);
    }

    for(i = 0; i < M; i++) { 
        pthread_create(&writers[i], NULL, writer_thread, (void*)&dc);
    }
    fclose(fr);
    fclose(to);

    return 0;   
}

любое предложение приветствуется!


person user1127217    schedule 04.01.2012    source источник
comment
Не могли бы вы обновить внесенные вами изменения, это поможет мне, поскольку я смотрю на аналогичную проблему.   -  person m4n07    schedule 25.10.2012


Ответы (3)


Ваши потоки читают и пишут в файлы, которые вы открываете и закрываете в main. Но main явно не ждет завершения потоков перед закрытием этих файлов.

person Scott Hunter    schedule 04.01.2012

В дополнение к проблеме, указанной Скоттом Хантером, ваши читатели и писатели выполняют всю свою «настоящую работу», удерживая мьютекс, что в первую очередь устраняет необходимость иметь более одного потока.

Читатели должны действовать следующим образом:

1) Получить мьютекс.
2) Заблокировать переменную условия до тех пор, пока работа не будет доступна.
3) Удалить работу из очереди, возможно, переменную условия сигнала.
4) Освободить мьютекс.
5) Обработать работа.
6) Перейдите к шагу 1.

Писатели должны действовать следующим образом:

1) Получить информацию, которую нам нужно записать.
2) Захватить мьютекс.
3) Заблокировать переменную условия, пока в очереди не останется места.
4) Поместить информацию в очередь, возможно, сигнал переменная условия.
5) Освободить мьютекс.
6) Перейти к шагу 1.

Обратите внимание, что оба потока выполняют «настоящую работу», не удерживая мьютекс? В противном случае, зачем иметь несколько потоков, если только один из них может работать одновременно?

person David Schwartz    schedule 04.01.2012
comment
Потому что там условия гонки? Или вы говорите, что получение мьютекса не нужно, b/c cond_wait сделает это за вас? -- Неважно (я читал списки должно работать, как работает :) - person Scott Hunter; 04.01.2012
comment
спасибо, это было действительно полезно! Я воспользовался вашим советом, и теперь он работает намного лучше. - person user1127217; 05.01.2012

Я не уверен, поможет ли мой ответ вам или нет ... но я сделаю все возможное, дав вам справочный код.

Я написал аналогичную программу (за исключением того, что она не записывает в файл, а вместо этого отображает очередь-/произведенные-/потребленные-элементы в stdout). Его можно найти здесь — https://github.com/sangeeths/pc. Я разделил обработку командной строки и логику очереди в отдельные файлы.

Надеюсь это поможет!

person Sangeeth Saravanaraj    schedule 04.01.2012
comment
Я использовал реализацию очереди из вашего кода, так как добавлять и удалять элементы из нее намного проще, спасибо! - person user1127217; 05.01.2012
comment
@user1127217 user1127217 Я рад, что вы можете повторно использовать часть моего кода. Если вы найдете это полезным, пожалуйста, не стесняйтесь голосовать и выбирать это как ответ. Также, пожалуйста, поделитесь своим кодом, чтобы я мог просмотреть и узнать, насколько он отличается от моего. Спасибо! - person Sangeeth Saravanaraj; 05.01.2012