У меня сложилось впечатление, что flock(2) является потокобезопасным, недавно я наткнулся на случай в коде, когда несколько потоков могут получить блокировку одного и того же файла, и все они синхронизированы с использованием получения монопольной блокировки с помощью стаи c api. Процесс 25554 — это многопоточное приложение с 20 потоками, количество потоков, блокирующих один и тот же файл, меняется, когда возникает взаимоблокировка. Многопоточное приложение testEvent выполняет запись в файл, а push — чтение из файла. К сожалению, lsof
не печатает значение LWP, поэтому я не могу найти, какие потоки удерживают блокировку. Когда происходит указанное ниже условие, и процесс, и потоки застревают в вызове flock, что отображается вызовом pstack
или strace
для pid 25569 и 25554. Любые предложения о том, как преодолеть это в RHEL 4.x.
Одна вещь, которую я хотел обновить, это то, что flock не ведет себя неправильно все время, когда скорость передачи сообщений превышает 2 Мбит / с, только тогда я попадаю в эту проблему тупика с flock, ниже этой скорости передачи все является файлом. Я сохранил константу num_threads
= 20, size_of_msg
= 1000 байт и просто изменил количество сообщений, передаваемых в секунду, начиная с 10 сообщений до 100 сообщений, что составляет 20 * 1000 * 100 = 2 Мбит / с, когда я увеличиваю количество сообщений до 150, тогда происходит проблема со стадом.
Я просто хотел спросить, что вы думаете о flockfile c api.
sudo lsof filename.txt
COMMAND PID USER FD TYPE DEVICE SIZE NODE NAME
push 25569 root 11u REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 27uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 28uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 29uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 30uW REG 253.4 1079 49266853 filename.txt
Многопоточная тестовая программа, которая вызовет функцию write_data_lib_func
lib.
void* sendMessage(void *arg) {
int* numOfMessagesPerSecond = (int*) arg;
std::cout <<" Executing p thread id " << pthread_self() << std::endl;
while(!terminateTest) {
Record *er1 = Record::create();
er1.setDate("some data");
for(int i = 0 ; i <=*numOfMessagesPerSecond ; i++){
ec = _write_data_lib_func(*er1);
if( ec != SUCCESS) {
std::cout << "write was not successful" << std::endl;
}
}
delete er1;
sleep(1);
}
return NULL;
Вышеупомянутый метод будет вызываться в pthreads в основной функции теста.
for (i=0; i<_numThreads ; ++i) {
rc = pthread_create(&threads[i], NULL, sendMessage, (void *)&_num_msgs);
assert(0 == rc);
}
Вот источник записи / чтения, из-за проприетарных причин я не хотел просто вырезать и вставлять, источник записи будет иметь доступ к нескольким потокам в процессе.
int write_data_lib_func(Record * rec) {
if(fd == -1 ) {
fd = open(fn,O_RDWR| O_CREAT | O_APPEND, 0666);
}
if ( fd >= 0 ) {
/* some code */
if( flock(fd, LOCK_EX) < 0 ) {
print "some error message";
}
else {
if( maxfilesize) {
off_t len = lseek ( fd,0,SEEK_END);
...
...
ftruncate( fd,0);
...
lseek(fd,0,SEEK_SET);
} /* end of max spool size */
if( writev(fd,rec) < 0 ) {
print "some error message" ;
}
if(flock(fd,LOCK_UN) < 0 ) {
print some error message;
}
Со стороны читателя это процесс-демон без потоков.
int readData() {
while(true) {
if( fd == -1 ) {
fd= open (filename,O_RDWR);
}
if( flock (fd, LOCK_EX) < 0 ) {
print "some error message";
break;
}
if( n = read(fd,readBuf,readBufSize)) < 0 ) {
print "some error message" ;
break;
}
if( off < n ) {
if ( off <= 0 && n > 0 ) {
corrupt_file = true;
}
if ( lseek(fd, off-n, SEEK_CUR) < 0 ) {
print "some error message";
}
if( corrupt_spool ) {
if (ftruncate(fd,0) < 0 ) {
print "some error message";
break;
}
}
}
if( flock(fd, LOCK_UN) < 0 )
print some error message ;
}
}
}
flock
? Неплохо было бы использовать простую тестовую программу. - person phihag   schedule 27.02.2012