Проблема с производительностью в PySpark/Aws Glue

У меня есть датафрейм. Мне нужно преобразовать каждую запись в JSON, а затем вызвать API с полезной нагрузкой JSON, чтобы вставить данные в postgress. У меня есть 14000 записей в кадре данных, и чтобы вызвать API и получить ответ, требуется 5 часов. Есть ли способ улучшить производительность. Ниже мой фрагмент кода.

df_insert = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "source_table_name") \
.load()

json_insert = df_insert.toJSON().collect()

for row in json_insert:
  line = json.loads(row)
    headers = {
    'Authorization': authorization,
    'content-type': "application/json",
    'cache-control': "no-cache",
    }
  response = requests.request("POST", url_insert, data=payload, headers=headers)
  print(response.text)
  res = response.text
  response_result = json.loads(res)
  #print(response_result["httpStatus"])
  if response_result["message"] == 'success':
      print ("INFO : Record inserted successfully")
  else:
      print ("ERROR : Error in the record")
      status_code = response_result["status"]
      error_message =  response_result["error"]
      my_list = [(status_code,error_message,row)]
      df = sc.createDataFrame(my_list, ['status', 'error', 'json data'])
      df.write.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("dbtable", "error_table") \
      .option("header", "true") \
      .option("truncate_table", "on") \
      .mode("append") \
      .save()

Примечание. Я знаю, что, выполняя «json_insert = df_insert.toJSON().collect()», я теряю преимущество фрейма данных. Есть ли лучший способ выполнить.


person Basant Jain    schedule 01.03.2019    source источник


Ответы (1)


df_insert.toJSON() возвращает RDD, который вы можете flatMap преодолеть. 1

source_rdd = df_insert.toJSON()

Выполните flatMap над этим СДР и получите обратно СДР, содержащий только ошибки.

headers = {
    'Authorization': authorization,
    'content-type': "application/json",
    'cache-control': "no-cache"
}

def post_service_error(row):
    # requests package may not be available in the node
    # see about adding files to the spark context
    response = requests.request("POST", url_insert, data=row, headers=headers)
    response_result = response.json()
    if response_result['message'] == 'success':
        print ("INFO : Record inserted successfully")
        return []
    print ("ERROR : Error in the record")
    status_code = response_result["status"]
    error_message =  response_result["error"]
    return [(status_code, error_message, row)]

errors_rdd = source_rdd.flatMap(post_service_error)

Преобразуйте ошибки RDD в искровой DataFrame и сохраните его в таблице.

errors_df = sc.createDataFrame(errors_rdd, ['status', 'error', 'json data'])
(errors_df.write.format(SNOWFLAKE_SOURCE_NAME)
  .options(**sfOptions)
  .option("dbtable", "error_table")
  .option("header", "true")
  .option("truncate_table", "on")
  .mode("append")
  .save())

Если у вас есть API, к которому вы делаете запрос, я предлагаю изучить реализацию, которая принимает пакет этих объектов/массивов. Таким образом, вы можете разбить RDD перед сопоставлением каждого раздела с пакетным запросом и после этого обработать ошибку.

person Oluwafemi Sule    schedule 01.03.2019
comment
У нас нет API, и мы должны перебирать каждый объект json и вызывать API. При описанном выше подходе задание не выполняется внутри функции post_service_error, когда они вызываются с использованием flatMap. - person Basant Jain; 02.03.2019
comment
Вы можете использовать flatMap для списка кортежей ошибок, если у вас нет этого API, как показано в примере. Работа не входит в функцию — не уверен, что полностью понимаю, что вы имеете в виду. Какая ошибка регистрируется в вашей основной консоли или консоли узла по этому поводу? - person Oluwafemi Sule; 02.03.2019
comment
Я начал с приведенного ниже кода: def post_service_error(row): print (внутренний метод ..............) print(row) print('Начало задания.......... .') source_rdd.map(post_service_error) print('Задание завершено...............') В выводе начальное задание и завершение задания печатаются, но не внутри метода - person Basant Jain; 02.03.2019
comment
У вас по-прежнему есть вызов collect на Rdd независимо от того, flatMap или map вы распределяете задачу по узлам и собираете результаты задачи. map и flatMap — это просто способы объявления задач. Как вы написали в своем вопросе, вы запускаете задачу на мастере и не используете другие узлы, которые могут быть в вашем кластере для запуска задач. - person Oluwafemi Sule; 02.03.2019
comment
Да, я не могу воспользоваться другими узлами из-за сбора, и в результате работа занимает слишком много времени. Поэтому, чтобы улучшить результат, я попробовал способ, который вы предоставили. Здесь, когда я попытался вызвать функцию (post_service_error) из flatMap (source_rdd.flatMap (post_service_error)), я ничего не получил. Я попытался только дать команду печати внутри функции, но она не печатается. Извините за мое недопонимание - person Basant Jain; 02.03.2019
comment
Понимаю. Вы проверяли журналы Hadoop? Вы можете проверить журналы рабочих процессов, чтобы узнать, не записано ли там что-либо о задаче. stackoverflow.com/a/39911816/5189811 - person Oluwafemi Sule; 02.03.2019
comment
Конечно, вы можете сделать это или использовать многопроцессорность. - person Oluwafemi Sule; 02.03.2019