У меня есть датафрейм. Мне нужно преобразовать каждую запись в JSON, а затем вызвать API с полезной нагрузкой JSON, чтобы вставить данные в postgress. У меня есть 14000 записей в кадре данных, и чтобы вызвать API и получить ответ, требуется 5 часов. Есть ли способ улучшить производительность. Ниже мой фрагмент кода.
df_insert = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "source_table_name") \
.load()
json_insert = df_insert.toJSON().collect()
for row in json_insert:
line = json.loads(row)
headers = {
'Authorization': authorization,
'content-type': "application/json",
'cache-control': "no-cache",
}
response = requests.request("POST", url_insert, data=payload, headers=headers)
print(response.text)
res = response.text
response_result = json.loads(res)
#print(response_result["httpStatus"])
if response_result["message"] == 'success':
print ("INFO : Record inserted successfully")
else:
print ("ERROR : Error in the record")
status_code = response_result["status"]
error_message = response_result["error"]
my_list = [(status_code,error_message,row)]
df = sc.createDataFrame(my_list, ['status', 'error', 'json data'])
df.write.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "error_table") \
.option("header", "true") \
.option("truncate_table", "on") \
.mode("append") \
.save()
Примечание. Я знаю, что, выполняя «json_insert = df_insert.toJSON().collect()», я теряю преимущество фрейма данных. Есть ли лучший способ выполнить.