несколько потоков могут получить стадо одновременно

У меня сложилось впечатление, что flock(2) является потокобезопасным, недавно я наткнулся на случай в коде, когда несколько потоков могут получить блокировку одного и того же файла, и все они синхронизированы с использованием получения монопольной блокировки с помощью стаи c api. Процесс 25554 — это многопоточное приложение с 20 потоками, количество потоков, блокирующих один и тот же файл, меняется, когда возникает взаимоблокировка. Многопоточное приложение testEvent выполняет запись в файл, а push — чтение из файла. К сожалению, lsof не печатает значение LWP, поэтому я не могу найти, какие потоки удерживают блокировку. Когда происходит указанное ниже условие, и процесс, и потоки застревают в вызове flock, что отображается вызовом pstack или strace для pid 25569 и 25554. Любые предложения о том, как преодолеть это в RHEL 4.x.

Одна вещь, которую я хотел обновить, это то, что flock не ведет себя неправильно все время, когда скорость передачи сообщений превышает 2 Мбит / с, только тогда я попадаю в эту проблему тупика с flock, ниже этой скорости передачи все является файлом. Я сохранил константу num_threads = 20, size_of_msg = 1000 байт и просто изменил количество сообщений, передаваемых в секунду, начиная с 10 сообщений до 100 сообщений, что составляет 20 * 1000 * 100 = 2 Мбит / с, когда я увеличиваю количество сообщений до 150, тогда происходит проблема со стадом.

Я просто хотел спросить, что вы думаете о flockfile c api.

 sudo lsof filename.txt
    COMMAND       PID     USER     FD       TYPE     DEVICE     SIZE   NODE       NAME
    push         25569    root     11u       REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     27uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     28uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     29uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     30uW      REG      253.4      1079   49266853   filename.txt

Многопоточная тестовая программа, которая вызовет функцию write_data_lib_func lib.

void* sendMessage(void *arg)  {

int* numOfMessagesPerSecond = (int*) arg;
std::cout <<" Executing p thread id " << pthread_self() << std::endl;
 while(!terminateTest) {
   Record *er1 = Record::create();
   er1.setDate("some data");

   for(int i = 0 ; i <=*numOfMessagesPerSecond ; i++){
     ec = _write_data_lib_func(*er1);
     if( ec != SUCCESS) {
       std::cout << "write was not successful" << std::endl;

     }

   }
   delete er1;
   sleep(1);
 }

 return NULL;

Вышеупомянутый метод будет вызываться в pthreads в основной функции теста.

for (i=0; i<_numThreads ; ++i) {
  rc = pthread_create(&threads[i], NULL, sendMessage, (void *)&_num_msgs);
  assert(0 == rc);

}

Вот источник записи / чтения, из-за проприетарных причин я не хотел просто вырезать и вставлять, источник записи будет иметь доступ к нескольким потокам в процессе.

int write_data_lib_func(Record * rec) {      
if(fd == -1 ) {  
    fd = open(fn,O_RDWR| O_CREAT | O_APPEND, 0666);
} 
if ( fd >= 0 ) {
   /* some code */ 

   if( flock(fd, LOCK_EX) < 0 ) {
     print "some error message";
   }
   else { 
    if( maxfilesize) {
      off_t len = lseek ( fd,0,SEEK_END);
      ...
      ... 
      ftruncate( fd,0);
      ...
      lseek(fd,0,SEEK_SET); 
   } /* end of max spool size */ 
   if( writev(fd,rec) < 0 ) {
     print "some error message" ; 
   }

   if(flock(fd,LOCK_UN) < 0 ) {
   print some error message; 
   } 

Со стороны читателя это процесс-демон без потоков.

int readData() {
    while(true) {
      if( fd == -1 ) {
         fd= open (filename,O_RDWR);
      }
      if( flock (fd, LOCK_EX) < 0 ) { 
        print "some error message"; 
        break; 
      } 
      if( n = read(fd,readBuf,readBufSize)) < 0 ) { 
        print "some error message" ;
        break;
      }  
      if( off < n ) { 
        if ( off <= 0 && n > 0 ) { 
          corrupt_file = true; 
        } 
        if ( lseek(fd, off-n, SEEK_CUR) < 0 ) { 
          print "some error message"; 
        } 
        if( corrupt_spool ) {  
          if (ftruncate(fd,0) < 0 ) { 
             print "some error message";
             break;
           }  
        }
      }
      if( flock(fd, LOCK_UN) < 0 ) 
       print some error message ;
      }  
   }     
}

person user1235176    schedule 27.02.2012    source источник
comment
Можете ли вы опубликовать код, который вызывает flock? Неплохо было бы использовать простую тестовую программу.   -  person phihag    schedule 27.02.2012
comment
Насколько я знаю, рекомендательные блокировки не гарантируют согласованности. gsp.com/cgi-bin/man.cgi?section= 2&topic=стадо   -  person zengr    schedule 27.02.2012


Ответы (2)


flock(2) задокументировано как "блокировка, если несовместимая блокировка удерживается другим < em>process" и "блокировки, созданные flock(), связаны с записью в таблице открытых файлов", поэтому следует ожидать, что flock-ed блокировки несколькими потоками одного и того же процесса не будут взаимодействовать. (в документации flock не упоминаются потоки).

Следовательно, решение должно быть для вас простым: свяжите один pthread_mutex_t с каждым файловым дескриптором, поддерживающим flock, и защитите вызов flock с помощью этого мьютекса. Вы также можете использовать pthread_rwlock_t, если вам нужна блокировка чтения и записи.

person Basile Starynkevitch    schedule 27.02.2012
comment
спасибо за ваш ответ, я уверен, что в той же строке я думал связать мьютекс с flock, что меня беспокоило, так это то, что flock (2) находится поверх fcntl (2), а fcntl (2) lockf (2) утверждал поток безопасно, поэтому для меня было неожиданностью, что это не так. - person user1235176; 28.02.2012
comment
Я попробовал это предложение сейчас, есть дополнительный pthread_mutex_t для синхронизации вызовов flock, теперь происходит 19 потоков, пытающихся получить pthread_mutex_t, и 1 поток не может получить монопольную блокировку файла с помощью flock, и он застрял на строка flock(fd,LOCK_EX); - person user1235176; 29.02.2012
comment
и что вы получаете от стаи, которая не удалась? - person zoska; 18.03.2016

На справочной странице Linux для flock(2):

Блокировки, созданные функцией flock(), связаны с записью таблицы открытых файлов. Это означает, что повторяющиеся файловые дескрипторы (созданные, например, fork(2) или dup(2)) ссылаются на одну и ту же блокировку, и эта блокировка может быть изменена или снята с помощью любого из этих дескрипторов. Кроме того, блокировка снимается либо явной операцией LOCK_UN на любом из этих повторяющихся дескрипторов, либо когда все такие дескрипторы закрыты.

Кроме того, блокировки flock не «стекаются», поэтому, если вы попытаетесь получить блокировку, которую вы уже удерживаете, вызов flock представляет собой noop, который немедленно возвращается без блокировки и без какого-либо изменения состояния блокировки.

Поскольку потоки внутри процесса совместно используют файловые дескрипторы, вы можете собирать файл несколько раз из разных потоков, и он не будет блокироваться, поскольку блокировка уже удерживается.

Также из примечаний к flock(2):

Блокировки flock() и fcntl(2) имеют различную семантику по отношению к разветвленным процессам и dup(2). В системах, реализующих flock() с использованием fcntl(2), семантика flock() будет отличаться от описанной на этой странице руководства.

person Chris Dodd    schedule 28.02.2012
comment
Это неправда, Крис, если вы видите мой вывод lsof, дескрипторы файлов уникальны для каждого потока. - person user1235176; 28.02.2012
comment
Потоки внутри процесса совместно используют файловые дескрипторы. См. stackoverflow.com/questions/6223776/ РЕДАКТИРОВАТЬ - -- И я думаю, что большинство из нас, кто писал многопоточные программы, написали рабочий код, в котором файловые дескрипторы совместно используются потоками. - person Bob Stine; 10.05.2013