Как можно повторно использовать параллельный массив?

Я пытаюсь использовать параллельные коллекции Scala для параллельного выполнения некоторых вычислений. Поскольку входных данных много, я использую изменяемые массивы для хранения данных, чтобы избежать проблем с сборщиком мусора. Это первоначальный подход, который я использовал:

// initialize the reusable input data structure
val inputData = new Array[Array[Int]](Runtime.getRuntime.availableProcessors*ChunkSize)
for (i <- 0 until inputData.length) {
  inputData(i) = new Array[Int](arraySize)
}

// process the input
while (haveMoreInput()) {
  // read the input--must be sequential!
  for (array <- 0 until inputData.length) {
    for (index <- 0 until arraySize) {
      array(index) = deserializeFromExternalSource()
    }
  }
  // map the data in parallel
  // note that the input data is NOT modified by longRuningProcess
  val results = for (array <- inputData.par) yield {
    longRunningProcess(array)
  }
  // use the results--must be sequential and ordered as input
  for (result <- results.toArray) {
    useResult(result)
  }
}

Учитывая, что базовый массив ParallelArray можно безопасно повторно использовать (а именно, модифицировать и использовать в качестве базовой структуры другого ParallelArray), приведенный выше фрагмент должен работать, как и ожидалось. Однако при запуске он вылетает с ошибкой памяти:

*** Error in `*** Error in `java': double free or corruption (fasttop): <memory address> ***

Это якобы связано с тем, что параллельная коллекция напрямую использует массив, из которого она была создана; возможно, он пытается освободить этот массив, когда он выходит за рамки. В любом случае создание нового массива с каждым циклом не вариант, опять же, из-за ограничений памяти. Явное создание var parInputData = inputData.par внутри и снаружи цикла while приводит к одной и той же ошибке двойного освобождения.

Я не могу просто сделать саму inputData параллельной коллекцией, потому что ее нужно заполнять последовательно (попробовав сделать присваивания параллельной версии, я понял, что присваивания выполнялись не по порядку). Использование Vector в качестве внешней структуры данных, кажется, работает для относительно небольших размеров входных данных (‹ 1000000 входных массивов), но приводит к исключениям накладных расходов GC на больших входных данных.

Подход, который я выбрал, заключался в создании Vector[Vector[Array[Int]]] с внешним вектором, имеющим длину, равную числу используемых параллельных потоков. Затем я вручную заполнил каждый sub-Vector куском входных массивов данных, а затем сделал параллельную карту по внешнему вектору.

Этот последний подход работает, но утомительно вручную разделять входные данные на фрагменты и добавлять эти фрагменты в параллельную коллекцию на другом уровне глубины. Есть ли способ разрешить Scala повторно использовать изменяемый массив для параллельных операций?

РЕДАКТИРОВАТЬ: Сравнительный анализ решения с параллельным вектором выше по сравнению с решением, распараллеленным вручную с использованием синхронных очередей, показал, что параллельный вектор работает примерно на 50% медленнее. Мне интересно, это просто накладные расходы на лучшую абстракцию или этот разрыв можно уменьшить за счет использования параллельных массивов, а не Vectors; это привело бы к еще одному преимуществу использования массивов по сравнению с Vectors.


person Ben Sidhom    schedule 31.07.2014    source источник
comment
Связан ли longRunningProcess с JNI/JNA? Потому что я почти уверен, что если вы не столкнетесь с какой-то непонятной ошибкой JVM, невозможно получить double free or corruption только из-за GC.   -  person om-nom-nom    schedule 01.08.2014
comment
Нет, это чистый вызов метода Java. На самом деле я не предполагаю, что эта проблема вызвана накладными расходами GC, а скорее реализацией массива параллельных коллекций Scala.   -  person Ben Sidhom    schedule 01.08.2014
comment
Я также должен уточнить, что GC overhead, о котором я говорю в отношении Vector, на самом деле является java.lang.OutOfMemoryError, а не проблемой двойной бесплатной JVM.   -  person Ben Sidhom    schedule 01.08.2014


Ответы (1)


На самом деле нет смысла разбивать ваши данные на куски, большая часть смысла библиотеки Parallel Collections заключается в том, что она делает это за вас и делает работу намного лучше, чем использование блоков фиксированного размера. Кроме того, массивы массивов в JVM не похожи на массивы массивов в C, они больше похожи на массивы указателей на множество маленьких массивов, что делает их неэффективными.

Более элегантный способ решить эту проблему — использовать обычный Array и использовать ParRange для работы с ним. longRunningProcess нужно было бы изменить, чтобы работать с одним элементом за раз:

val arraySize = ???

val inputData = Array[Int](arraySize)
val outputData = Array[ResultType](arraySize)

while(haveMoreInput()) {
  for (i <- 0 until arraySize)
    inputData(i) = deserializeFromExternalSource()
  for (i <- (0 until arraySize).par)
    outputData(i) = longRunningProcess(inputData(i))
  outputData.foreach(useResult)
}

Это использует только два больших массива и никогда не выделяет новые массивы. ParArray.map, ParArray.toArray и Array.par выделили новые массивы в исходном коде.

Нам по-прежнему нужно использовать фиксированный arraySize, чтобы убедиться, что мы не загружаем в память больше данных, для которых у нас есть место. Лучшим решением было бы использовать реактивные потоки, но они еще не готовы к работе.

person wingedsubmariner    schedule 01.08.2014
comment
На самом деле я не разбиваю его на более мелкие массивы. longRunningProcess — это метод Java, над которым я не контролирую и который требует ввода массива. Рассмотрим каждый подмассив как один вход. - person Ben Sidhom; 02.08.2014
comment
Однако я не заметил вашего метода заполнения выходных данных; это выглядит очень полезным для сохранения порядка. Подобный подход можно использовать даже в том случае, когда атомы входных данных представляют собой массивы. - person Ben Sidhom; 02.08.2014