Всем привет! Сегодня давайте рассмотрим широковещательные и накопительные переменные в Pyspark…!!!
Переменные трансляции PySpark
🢂 Широковещательные переменные — это общие переменные только для чтения, которые кэшируются и доступны на всех узлах в кластере для доступа или использования задачами.
🢂 Позвольте мне объяснить на примере, когда использовать широковещательные переменные. Предположим, вы получаете двухбуквенный код штата в файле и хотите преобразовать его в полное название штата (например, из CA в Калифорнию, из Нью-Йорка в Нью-Йорк). и т.д.), выполнив поиск сопоставления ссылок. В некоторых случаях эти данные могут быть большими, и у вас может быть много таких запросов (например, почтовый индекс и т. д.).
🢂 Вместо того, чтобы распространять эту информацию вместе с каждой задачей по сети (накладные расходы и затраты времени), мы можем использовать широковещательную переменную для кэширования этой информации поиска на каждой машине, и задачи используют эту кэшированную информацию при выполнении преобразований.
PySpark выполняет следующий процесс:
- PySpark разбивает задание на этапы с распределенным перемешиванием, и действия выполняются внутри этапа.
- Более поздние этапы также разбиты на задачи.
- Spark транслирует общие данные (повторно используемые), необходимые задачам на каждом этапе.
- Передаваемые данные кэшируются в сериализованном формате и десериализуются перед выполнением каждой задачи.
Давайте посмотрим, как создать широковещательную переменную:
Трансляция PySpark создается с помощью метода broadcast(v)
класса SparkContext. Этот метод принимает аргумент v, который вы хотите транслировать.
broadcastVar = sc.broadcast(Array(0, 1, 2, 3))
broadcastVar.value
Пример переменной PySpark RDD Broadcast
Ниже приведен очень простой пример использования широковещательных переменных в RDD. В этом примере часто используемые данные (состояния) определяются в переменной Map, переменная распределяется с помощью SparkContext.broadcast()
, а затем эти переменные используются в преобразовании RDD map().
ПРИМЕР:
➤ВВОД
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
]
rdd = spark.sparkContext.parallelize(data)
def state_convert(code):
return broadcastStates.value[code]
result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)
➤ ВЫВОД
🢂 Мы также можем использовать широковещательную переменную в фильтре и соединениях. Ниже приведен пример фильтра.
Переменные аккумуляторов PySpark
🢂 Аккумуляторы имеют общие переменные, которые «добавляются» только посредством ассоциативной и коммутативной операции и используются для выполнения счетчиков (аналогично счетчикам Map-reduce) или операций суммирования.
🢂Аккумуляторы доступны только для записи и инициализируются один раз переменными, где только задачи, запущенные на рабочих процессах, могут обновляться, а обновления от рабочих процессов автоматически распространяются на программу драйвера.
🢂Но доступ к переменной Accumulator разрешен только программе-драйверу с использованием свойства value.
Как создать переменную Accumulator в PySpark?
Используя accumulator() из класса SparkContext, мы можем создать аккумулятор в программировании PySpark. Пользователи также могут создавать аккумуляторы для пользовательских типов, используя AccumulatorParam
класс PySpark.
- sparkContext.accumulator() используется для определения переменных-аккумуляторов.
- Функция add() используется для добавления/обновления значения в аккумуляторе.
- value в переменной-аккумуляторе используется для извлечения значения из аккумулятора.
Давайте рассмотрим пример
Пример ниже представляет собой полный пример RDD с использованием различных аккумуляторов.
import pyspark from pyspark.sql import SparkSession spark=SparkSession.builder.appName("accumulator").getOrCreate()
accum=spark.sparkContext.accumulator(0) rdd=spark.sparkContext.parallelize([1,2,3,4,5]) rdd.foreach(lambda x:accum.add(x)) print(accum.value)
accuSum=spark.sparkContext.accumulator(0) def countFun(x): global accuSum accuSum+=x rdd.foreach(countFun) print(accuSum.value)
accumCount=spark.sparkContext.accumulator(0) rdd2=spark.sparkContext.parallelize([1,2,3,4,5]) rdd2.foreach(lambda x:accumCount.add(1)) print(accumCount.value)
Надеюсь, эта статья поможет вам лучше понять переменные широковещательной передачи и накопителя, пройдя через нее…!!
Увидимся в следующем блоге…..!