Срок действия соединений RPostgreSQL истекает, как только они инициируются с помощью doParallel clusterEvalQ.

Я пытаюсь настроить параллельную задачу, в которой каждому работнику нужно будет делать запросы к базе данных. Я пытаюсь настроить каждого рабочего с подключением, как показано в этом вопросе, но каждый раз, когда я пытаюсь это сделать возвращает <Expired PostgreSQLConnection:(2781,0)> для любого количества зарегистрированных рабочих.

Вот мой код:

cl <- makeCluster(detectCores())
registerDoParallel(cl)

clusterEvalQ(cl, {
  library(RPostgreSQL)
  drv<-dbDriver("PostgreSQL")
  con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")

})

Если я попытаюсь запустить свой foreach, несмотря на ошибку, произойдет сбой с task 1 failed - "expired PostgreSQLConnection"

Когда я захожу в статус сервера postgres, он показывает все созданные активные сеансы.

У меня нет проблем с взаимодействием с postgres из моего основного экземпляра R.

Если я побегу

clusterEvalQ(cl, {
  library(RPostgreSQL)
  drv<-dbDriver("PostgreSQL")
  con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
  dbGetQuery(con, "select inet_client_port()")

})

затем он вернет все клиентские порты. Это не дает мне уведомление об истечении срока действия, но если я попытаюсь запустить команду foreach, она завершится с той же ошибкой.

Редактировать:

Я пробовал это на Ubuntu и 2 компьютерах с Windows, все они дают одну и ту же ошибку.

Другое редактирование:

Теперь 3 компьютера с Windows


person Dean MacGregor    schedule 24.06.2015    source источник
comment
Можете ли вы также добавить свой код foreach в сообщение?   -  person Jellen Vermeir    schedule 28.06.2015
comment
@JellenVermeir терпит неудачу при любой команде dbGetQuery независимо от того, что еще находится в foreach. Например, foreach(i=1:4) %dopar% dbGetQuery(con, "select * from sometable limit 1") потерпит неудачу, а foreach(i=1:4) %do% dbGetQuery(con, "select * from sometable limit 1") не потерпит неудачу. Когда я говорю, что это не удается, я имею в виду, что получаю сообщение об ошибке с истекшим сроком действия PostgreSQLConnection.   -  person Dean MacGregor    schedule 28.06.2015


Ответы (1)


Я смог воспроизвести вашу проблему локально. Я не совсем уверен, но я думаю, что проблема связана с тем, как clusterEvalQ работает внутри. Например, вы говорите, что dbGetQuery(con, "select inet_client_port()) дал вам вывод порта клиента. Если бы запрос действительно оценивался/выполнялся на узлах кластера, вы не смогли бы увидеть этот вывод (так же, как вы не можете напрямую прочитать любой другой вывод или операторы печати, которые выполняются на внешних узлах кластера).

Следовательно, насколько я понимаю, оценка каким-то образом сначала выполняется в локальной среде, а соответствующие функции и переменные впоследствии копируются/экспортируются в отдельные узлы кластера. Это будет работать для любых других типов функций/переменных, но, очевидно, не для соединений с базой данных. Если соединения/сопоставления портов связаны с главным экземпляром R, то соединения не будут работать из подчиненных экземпляров. Вы также получите точно такую ​​же ошибку, если попытаетесь использовать функцию clusterExport для экспорта соединений, созданных в главном экземпляре.

В качестве альтернативы вы можете создать отдельные соединения внутри отдельных foreach задач. Я проверил с помощью локальной базы данных, что работает следующее:

library(doParallel)
nrCores = detectCores()
cl <- makeCluster(nrCores)
registerDoParallel(cl)
clusterEvalQ(cl,library(RPostgreSQL))
clusterEvalQ(cl,library(DBI))

result <- foreach(i=1:nrCores) %dopar%
{
  drv <- dbDriver("PostgreSQL")
  con <- dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
  queryResult <- dbGetQuery(con, "fetch something...")
  dbDisconnect(con)
  return(queryResult)
}
stopCluster(cl)

Однако теперь вы должны учитывать, что вы будете создавать и отключать новое соединение каждые foreach итераций. Из-за этого вы можете понести некоторые накладные расходы на производительность. Вы, очевидно, можете обойти это, разумно разделив свои запросы/данные, чтобы большая часть работы выполнялась за одну итерацию. В идеале вы должны разделить работу на столько ядер, сколько у вас есть.

person Jellen Vermeir    schedule 28.06.2015