Всем привет! Сегодня давайте рассмотрим широковещательные и накопительные переменные в Pyspark…!!!

Переменные трансляции PySpark

🢂 Широковещательные переменные — это общие переменные только для чтения, которые кэшируются и доступны на всех узлах в кластере для доступа или использования задачами.

🢂 Позвольте мне объяснить на примере, когда использовать широковещательные переменные. Предположим, вы получаете двухбуквенный код штата в файле и хотите преобразовать его в полное название штата (например, из CA в Калифорнию, из Нью-Йорка в Нью-Йорк). и т.д.), выполнив поиск сопоставления ссылок. В некоторых случаях эти данные могут быть большими, и у вас может быть много таких запросов (например, почтовый индекс и т. д.).

🢂 Вместо того, чтобы распространять эту информацию вместе с каждой задачей по сети (накладные расходы и затраты времени), мы можем использовать широковещательную переменную для кэширования этой информации поиска на каждой машине, и задачи используют эту кэшированную информацию при выполнении преобразований.

PySpark выполняет следующий процесс:

  • PySpark разбивает задание на этапы с распределенным перемешиванием, и действия выполняются внутри этапа.
  • Более поздние этапы также разбиты на задачи.
  • Spark транслирует общие данные (повторно используемые), необходимые задачам на каждом этапе.
  • Передаваемые данные кэшируются в сериализованном формате и десериализуются перед выполнением каждой задачи.

Давайте посмотрим, как создать широковещательную переменную:

Трансляция PySpark создается с помощью метода broadcast(v) класса SparkContext. Этот метод принимает аргумент v, который вы хотите транслировать.

broadcastVar = sc.broadcast(Array(0, 1, 2, 3))
broadcastVar.value

Пример переменной PySpark RDD Broadcast

Ниже приведен очень простой пример использования широковещательных переменных в RDD. В этом примере часто используемые данные (состояния) определяются в переменной Map, переменная распределяется с помощью SparkContext.broadcast(), а затем эти переменные используются в преобразовании RDD map().

ПРИМЕР:

➤ВВОД

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
    return broadcastStates.value[code]

result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)

➤ ВЫВОД

🢂 Мы также можем использовать широковещательную переменную в фильтре и соединениях. Ниже приведен пример фильтра.

Переменные аккумуляторов PySpark

🢂 Аккумуляторы имеют общие переменные, которые «добавляются» только посредством ассоциативной и коммутативной операции и используются для выполнения счетчиков (аналогично счетчикам Map-reduce) или операций суммирования.

🢂Аккумуляторы доступны только для записи и инициализируются один раз переменными, где только задачи, запущенные на рабочих процессах, могут обновляться, а обновления от рабочих процессов автоматически распространяются на программу драйвера.

🢂Но доступ к переменной Accumulator разрешен только программе-драйверу с использованием свойства value.

Как создать переменную Accumulator в PySpark?

Используя accumulator() из класса SparkContext, мы можем создать аккумулятор в программировании PySpark. Пользователи также могут создавать аккумуляторы для пользовательских типов, используя AccumulatorParam класс PySpark.

  • sparkContext.accumulator() используется для определения переменных-аккумуляторов.
  • Функция add() используется для добавления/обновления значения в аккумуляторе.
  • value в переменной-аккумуляторе используется для извлечения значения из аккумулятора.

Давайте рассмотрим пример

Пример ниже представляет собой полный пример RDD с использованием различных аккумуляторов.

import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("accumulator").getOrCreate()
accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value)
accuSum=spark.sparkContext.accumulator(0)
def countFun(x):
    global accuSum
    accuSum+=x
rdd.foreach(countFun)
print(accuSum.value)
accumCount=spark.sparkContext.accumulator(0)
rdd2=spark.sparkContext.parallelize([1,2,3,4,5])
rdd2.foreach(lambda x:accumCount.add(1))
print(accumCount.value)

Надеюсь, эта статья поможет вам лучше понять переменные широковещательной передачи и накопителя, пройдя через нее…!!

Увидимся в следующем блоге…..!