Как создать подключение(я) к источнику данных в Spark Streaming для поиска

У меня есть вариант использования, когда мы транслируем события, и для каждого события мне нужно выполнить поиск. Поиски находятся в Redis, и мне интересно, как лучше всего создавать соединения. Искровая потоковая передача будет запускать 40 исполнителей, и у меня есть 5 таких заданий потоковой передачи, все из которых подключаются к одному и тому же кластеру Redis. Поэтому я смущен, какой подход я должен использовать для создания соединения Redis.

  1. Создайте объект подключения в драйвере и передайте его исполнителям (не уверен, что это действительно работает, поскольку я должен сделать этот объект сериализуемым). Могу ли я сделать это с широковещательными переменными?

  2. Создайте соединение Redis для каждого раздела, однако у меня есть код, написанный таким образом.

    val update = xyz.transform(rdd => { // on driver if (xyz.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(Key_trans => { // perform some lookups logic here } } })

Итак, теперь, если я создам соединение внутри каждого раздела, это будет означать, что для каждого RDD и для каждого раздела в этом RDD я буду создавать новое соединение.

Есть ли способ поддерживать одно соединение для каждого раздела и кэшировать этот объект, чтобы мне не приходилось снова и снова создавать соединения?

Я могу добавить больше контекста/информации, если это необходимо.


person user3679686    schedule 15.03.2019    source источник


Ответы (1)


1. Создайте объект подключения в драйвере и передайте его исполнителям (не уверен, что это действительно работает, поскольку я должен сделать этот объект сериализуемым). Могу ли я сделать это с широковещательными переменными?

Ответ - Нет. Большинство объектов подключения не сериализуемы из-за машинно-зависимых данных, связанных с подключением.

2. Есть ли способ поддерживать одно соединение для каждого раздела и кэшировать этот объект, чтобы мне не приходилось создавать соединения снова и снова?

Ответ: Да, создайте пул соединений и используйте его в разделе. вот это стиль. Вы можете создать пул соединений, подобный этому https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

а затем использовать его

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

Пожалуйста, проверьте это: шаблон проектирования для использования foreachRDD

person deo    schedule 15.03.2019