Я пытаюсь использовать параллельные коллекции Scala для параллельного выполнения некоторых вычислений. Поскольку входных данных много, я использую изменяемые массивы для хранения данных, чтобы избежать проблем с сборщиком мусора. Это первоначальный подход, который я использовал:
// initialize the reusable input data structure
val inputData = new Array[Array[Int]](Runtime.getRuntime.availableProcessors*ChunkSize)
for (i <- 0 until inputData.length) {
inputData(i) = new Array[Int](arraySize)
}
// process the input
while (haveMoreInput()) {
// read the input--must be sequential!
for (array <- 0 until inputData.length) {
for (index <- 0 until arraySize) {
array(index) = deserializeFromExternalSource()
}
}
// map the data in parallel
// note that the input data is NOT modified by longRuningProcess
val results = for (array <- inputData.par) yield {
longRunningProcess(array)
}
// use the results--must be sequential and ordered as input
for (result <- results.toArray) {
useResult(result)
}
}
Учитывая, что базовый массив ParallelArray
можно безопасно повторно использовать (а именно, модифицировать и использовать в качестве базовой структуры другого ParallelArray
), приведенный выше фрагмент должен работать, как и ожидалось. Однако при запуске он вылетает с ошибкой памяти:
*** Error in `*** Error in `java': double free or corruption (fasttop): <memory address> ***
Это якобы связано с тем, что параллельная коллекция напрямую использует массив, из которого она была создана; возможно, он пытается освободить этот массив, когда он выходит за рамки. В любом случае создание нового массива с каждым циклом не вариант, опять же, из-за ограничений памяти. Явное создание var parInputData = inputData.par
внутри и снаружи цикла while
приводит к одной и той же ошибке двойного освобождения.
Я не могу просто сделать саму inputData
параллельной коллекцией, потому что ее нужно заполнять последовательно (попробовав сделать присваивания параллельной версии, я понял, что присваивания выполнялись не по порядку). Использование Vector
в качестве внешней структуры данных, кажется, работает для относительно небольших размеров входных данных (‹ 1000000 входных массивов), но приводит к исключениям накладных расходов GC на больших входных данных.
Подход, который я выбрал, заключался в создании Vector[Vector[Array[Int]]]
с внешним вектором, имеющим длину, равную числу используемых параллельных потоков. Затем я вручную заполнил каждый sub-Vector
куском входных массивов данных, а затем сделал параллельную карту по внешнему вектору.
Этот последний подход работает, но утомительно вручную разделять входные данные на фрагменты и добавлять эти фрагменты в параллельную коллекцию на другом уровне глубины. Есть ли способ разрешить Scala повторно использовать изменяемый массив для параллельных операций?
РЕДАКТИРОВАТЬ: Сравнительный анализ решения с параллельным вектором выше по сравнению с решением, распараллеленным вручную с использованием синхронных очередей, показал, что параллельный вектор работает примерно на 50% медленнее. Мне интересно, это просто накладные расходы на лучшую абстракцию или этот разрыв можно уменьшить за счет использования параллельных массивов, а не Vector
s; это привело бы к еще одному преимуществу использования массивов по сравнению с Vector
s.
double free or corruption
только из-за GC. - person om-nom-nom   schedule 01.08.2014GC overhead
, о котором я говорю в отношенииVector
, на самом деле являетсяjava.lang.OutOfMemoryError
, а не проблемой двойной бесплатной JVM. - person Ben Sidhom   schedule 01.08.2014