HashMap против ConcurrentHashMap: передача между потоками

У меня вопрос по использованию карт в многопоточном приложении. Предположим, у нас есть такой сценарий:

  1. Поток получает данные json как List<Map<String, Object>>, которые десериализуются Джексоном Джсоном.
  2. Этот поток изменяет полученные карты.
  3. А затем помещает список в очередь блокировки для использования другим потоком.

Как видите, карта модифицируется только одним потоком, но затем она становится доступной только для чтения (ничего не меняется, просто больше не модифицируется) и передается другому потоку. Затем, когда я изучил реализации HasMap (также TreeMap) и ConcurrentHashMap, последний имеет volatile полей, а первые два — нет. Итак, какую реализацию Map мне следует использовать в этом случае? Является ли ConcurrentHashMap излишним выбором или его должно использовать из-за передачи между потоками?

Мои простые тесты показывают, что я могу использовать HashMap/TreeMap, когда они изменяются синхронно, и это работает, но мой вывод или мой тестовый код могут быть неверными:

def map = new TreeMap() // or HashMap
def start = new CountDownLatch(1)
def threads = (1..5)
println("Threads: " + threads)
def created = new CountDownLatch(threads.size())
def completed = new CountDownLatch(threads.size())
threads.each {i ->
    new Thread({
        def from = i * 10
        def to = from + 10
        def local = (from..to)
        println(Thread.currentThread().name + " " + local)
        created.countDown()
        start.await()
        println('Mutating by ' + local)
        local.each {number ->
            synchronized (map) {
                map.put(number, ThreadLocalRandom.current().nextInt())
            }
            println(Thread.currentThread().name + ' added ' + number +  ': ' + map.keySet())
        }
        println 'Done: ' + Thread.currentThread().name
        completed.countDown()
    }).start()
}

created.await()
start.countDown()
completed.await()
println('Completed:')
map.each { e ->
    println('' + e.key + ': ' + e.value)
}

Основной поток порождает 5 дочерних потоков, которые синхронно обновляют общую карту, когда они завершаются, основной поток успешно видит все обновления дочерними потоками.


person cybersoft    schedule 18.09.2020    source источник
comment
Обратите внимание, что на практике большую часть этого следует делать с помощью исполнителя.   -  person chrylis -cautiouslyoptimistic-    schedule 18.09.2020
comment
Вы всегда можете обернуть карту в неизменяемые с помощью Collections.immutableMap(Map map). Таким образом, вы уверены, что параллельная модификация никогда не произойдет.   -  person Polygnome    schedule 18.09.2020
comment
@cybersoft Неизменяемые структуры данных по своей сути потокобезопасны из-за того, что они вообще не могут изменяться.   -  person Polygnome    schedule 18.09.2020
comment
@cybersoft Если вы сохраните ссылку на базовую коллекцию и измените ее, ваша структура данных больше не будет неизменной. Вы только что проделали гигантскую зияющую дыру в его неизменности. Но Op явно сказал, что данные не изменяются, и обертывание их в неизменяемую коллекцию, а затем сохранение только неизменяемых ссылок дает вам необходимые гарантия неизменности.   -  person Polygnome    schedule 18.09.2020
comment
@Polygnome тот факт, что объект является неизменным, не означает, что он защищен от проблем с видимостью памяти. Возьмите POJO без сеттера, он неизменен. Но если его поля не final и не volatile, это также не гарантирует вам параллелизма! Неизменяемость, как вы это делаете здесь (например, обертывание с помощью unmodifiableMap), оставляет исходную карту структурно неизменной и такой же безопасной или небезопасной, как и раньше. Если вам нужны конструкции, безопасные для параллелизма, этого недостаточно. Вам нужна неизменность + безопасная публикация и/или «происходит до» (java.util.concurrent).   -  person GPI    schedule 18.09.2020
comment
@güriösä определенно необходимы гарантии параллелизма как в сценарии OP, так и в его тестовом коде (кстати, это совершенно разные сценарии).   -  person GPI    schedule 18.09.2020
comment
@güriösä Что, если потребляющий поток является частью фиксированного пула потоков (они живы вечно), но производящие потоки с количеством 1 ... N периодически создаются недолговечными?   -  person cybersoft    schedule 18.09.2020
comment
Кроме того, если n производителей создаются периодически, в то время как потребители живут вечно, блокирующая очередь гарантирует правильное поведение производитель-потребитель. Но вам, скорее всего, понадобится структура сохранения потока вместо List‹Map‹String, Object›› для нескольких производителей одновременно, например. ConcurrentLinkedList   -  person Nikolai Dmitriev    schedule 18.09.2020
comment
Re, ... он становится доступным только для чтения... После того, как программа завершила заполнение какого-либо изменяемого объекта, и после этого безопасно публикует внесенные изменения, с этого момента мы говорим, что объект фактически неизменным.   -  person Solomon Slow    schedule 18.09.2020
comment
@GPI оболочка, возвращаемая Collections.unmodifiableMap(…), использует поле final. Если вы получаете доступ к исходной карте только через это поле (что означает, что вы никогда не будете изменять исходную карту после создания оболочки), у вас есть гарантии неизменяемости объектов. Тем не менее, отказ от безопасной публикации редко приносит пользу.   -  person Holger    schedule 24.09.2020


Ответы (2)


Классы java.util.concurrent имеют особые гарантии. относительно последовательности:

Эффекты согласованности памяти: как и в случае с другими параллельными коллекциями, действия в потоке до помещения объекта в BlockingQueue происходят до действий, следующих за доступом или удалением этого элемента из BlockingQueue в другом потоке.

Это означает, что вы можете использовать любые изменяемые объекты и манипулировать ими по своему усмотрению, а затем помещать их в очередь. Когда он будет получен, все примененные вами манипуляции будут видны.

(Обратите внимание, что тип теста, который вы продемонстрировали, может только доказать отсутствие безопасности; в большинстве реальных случаев несинхронизированный код работает нормально в 99% случаев. Это последний 1%, который вас кусает. .)

person chrylis -cautiouslyoptimistic-    schedule 18.09.2020
comment
Я прав: если я использую простой HashMap, обновляю его и помещаю, скажем, в ArrayBlockingQueue, гарантируется, что потребляющий поток получит обновленный HashMap? Если я вижу обратное на производстве, это ошибка? (openjdk-12.0.2) - person cybersoft; 18.09.2020
comment
@cybersoft Это именно гарантия: операции на карте до ее вставки в очередь гарантированно будут видны. - person chrylis -cautiouslyoptimistic-; 18.09.2020
comment
У меня уже есть 2 проблемы с одной и той же проблемой: HashMap теряет вложенный HashMap, когда потребляющий поток принимает корневой. Оба сервера работают под управлением Intel Xeon с узлами numa. Но java.util.concurrent гарантирует, что это происходит раньше, чем в памяти ... Не для процессора numa? или это ошибка в java? - person cybersoft; 08.10.2020

Этот вопрос имеет широкий охват.

Ваш оригинальный сценарий

Ты говоришь :

[A] карта модифицируется только одним потоком, но затем становится доступной только для чтения

Сложная часть — это слово «тогда». Когда вы, программист, говорите тогда, вы имеете в виду время часов, например. Я сделал это, теперь делай то. Но по невероятно большому количеству причин компьютер не думает (исполняет код) таким образом. То, что было раньше, и то, что происходит после, нужно синхронизировать вручную, чтобы компьютер увидел мир таким, каким его видим мы.

Вот как модель памяти Java выражает вещи: если вы хотите, чтобы ваши объекты вели себя предсказуемо в параллельной среде, вы должны убедиться, что вы устанавливаете их до границ.

Есть несколько вещей, которые устанавливаются до отношений в java-коде. Немного упрощая, и лишь некоторые из них:

  • порядок выполнения в одном потоке (если операторы 1 и 2 выполняются одним и тем же потоком в этом порядке, все, что сделал 1, всегда видно оператору 2)
  • Когда поток t1 start()s t2, все, что t1 делал до запуска t2, видно t2. Взаимно с join()
  • То же самое и с synchronized, мониторинг объектов: каждое действие, выполненное потоком внутри синхронизируемого блока, видно другому потоку, который синхронизируется в том же экземпляре.
  • То же самое касается любых специализированных методов java.util.concurrent классов. например, замки и семафоры, конечно, но также и коллекции: если вы помещаете элемент в синхронизированную коллекцию, поток, который извлекает его, имеет место перед потоком, который его поместил.
  • Если у T2 есть событие раньше с T1, а если у T3 есть одно с T2, то у T3 также есть это с T1.

Итак, вернемся к вашей фразе

затем он становится доступным только для чтения

Он становится доступным для чтения. Но чтобы компьютер увидел это, вы должны придать этому смысл; то есть: вы должны поместить happen before relationship в свой код.

Позже вы заявляете:

А затем помещает список в очередь блокировки

java.util.concurrent очередь? Как это аккуратно! Так уж получилось, что поток, извлекающий объект из параллельной очереди, имеет отношение «происходит до» с ответом на поток, который поместил указанный объект в очередь.

Вы установили реальность. Все мутации, сделанные (ранее) потоком, поставившим объект в очередь, благополучно видны тому, который его вытаскивает. В этом случае вам не нужен ConcurrentHashMap (если, конечно, ни один другой поток не изменяет те же данные).

Ваш пример кода

В вашем примере кода не используется очередь. И он изменяет одну карту, модифицированную несколькими потоками (а не наоборот, как упоминается в вашем сценарии). Так что это просто... не то же самое. Но в любом случае, ваш код в порядке.

Потоки, обращающиеся к карте, делают это так:

synchronized (map) {
    map.put(number, ThreadLocalRandom.current().nextInt())
}

synchornize обеспечивает 1) взаимное исключение потоков и 2) a происходит раньше. Таким образом, каждый поток, который входит в синхронизацию, видит все, что произошло раньше в другом потоке, который также синхронизировался с ним (то есть все они).

Так что здесь нет проблем.

И тогда ваш основной поток делает:

completed.await()
println('Completed:')
map.each { e ->
   println('' + e.key + ': ' + e.value)
}

Здесь вас спасает completed.await(). Это устанавливает, что происходит раньше с каждым потоком, вызвавшим countDown(), то есть со всеми ними. Таким образом, ваш основной поток видит все, что было сделано рабочими потоками. Все в порядке.

За исключением... Мы часто забываем проверять загрузку потоков. Первый раз, когда воркер синхронизируется с экземпляром карты, никто этого раньше не делал. Почему мы можем быть уверены, что они видят экземпляр карты полностью инициализированным и готовым?

Ну по двум причинам:

  1. Вы инициализируете экземпляр карты ПЕРЕД вызовом thread.start(), который устанавливает, что это произошло раньше. этого было бы достаточно
  2. Внутри ваших рабочих потоков вы также используете защелки перед началом работы, которые затем снова устанавливают отношения.

Вы в безопасности вдвойне.

person GPI    schedule 18.09.2020
comment
Теперь я понимаю, что мой фрагмент кода — это не то, что я хотел показать, этот фрагмент никогда не сломается из-за описанного множества событий, прежде чем - person cybersoft; 18.09.2020