Я пишу сценарий pythonc, который использует boto3
библиотеку python для запроса базы данных Aurora Serverless (PostgreSQL). Я использую DATA API для пакетной вставки (я делаю это несколькими партиями) очень большого CSV
файла, содержащего более 6 миллионов записей в БД. Каждая запись содержит 37 столбцов. Когда я запускаю сценарий на своем ПК (я настроил учетные данные AWS-CLI, чтобы использовать пользователя, авторизованного для разговора с БД Aurora в облаке), я могу успешно вставить несколько пакетов, содержащих 1800 операторов вставки SQL, прежде чем я получить эту ошибку:
An error occurred (413) when calling the BatchExecuteStatement operation
Скрипт Python:
import boto3
import csv
rds_client = boto3.client('rds-data')
database_name = "postgres"
db_cluster_arn = "arn:aws:rds:us-east-1:xxxxxxxx:cluster:database-1"
db_credentials_secrets_store_arn = "arn:aws:secretsmanager:us-east-1:xxxxxxxxxx:secret:rds-db-credentials/cluster-VPF7JUKVRLQMEHF4QV2HSIKELM/postgres-QVIzWC"
def batch_execute_statement(sql, sql_parameter_sets, transaction_id=None):
parameters = {
'secretArn': db_credentials_secrets_store_arn,
'database': database_name,
'resourceArn': db_cluster_arn,
'sql': sql,
'parameterSets': sql_parameter_sets
}
if transaction_id is not None:
parameters['transactionId'] = transaction_id
response = rds_client.batch_execute_statement(**parameters)
return response
def get_entry(row):
entry = [
{'name': 'RIG_ID', 'value': {'stringValue': row['RIG_ID']}},
# DEPTH_CAPACITY
if row['DEPTH_CAPACITY'] == '':
entry.append({'name': 'DEPTH_CAPACITY',
'value': {'isNull': True}})
else:
entry.append({'name': 'DEPTH_CAPACITY', 'typeHint': 'DECIMAL', 'value': {
'stringValue': row['DEPTH_CAPACITY']}})
# MANY MORE ENTRIES HERE
return entry
def execute_transaction(sql, parameter_set):
transaction = rds_client.begin_transaction(secretArn=db_credentials_secrets_store_arn,resourceArn=db_cluster_arn,database=database_name)
try:
response = batch_execute_statement(sql, parameter_set, transaction['transactionId'])
except Exception as e:
transaction_response = rds_client.rollback_transaction(
secretArn=db_credentials_secrets_store_arn,
resourceArn=db_cluster_arn,
transactionId=transaction['transactionId'])
else:
transaction_response = rds_client.commit_transaction(
secretArn=db_credentials_secrets_store_arn,
resourceArn=db_cluster_arn,
transactionId=transaction['transactionId'])
print(f'Number of records updated: {len(response["updateResults"])}')
print(f'Transaction Status: {transaction_response["transactionStatus"]}')
batch_size = 1800
current_batch_size = 0
transaction_count = 0
sql = 'INSERT INTO T_RIG_ACTIVITY_STATUS_DATE VALUES (\
:RIG_ID, :DEPTH_CAPACITY, # MANY MORE ENTRIES HERE);'
parameter_set = []
with open('LARGE_FILE.csv', 'r') as file:
reader = csv.DictReader(file, delimiter=',')
for row in reader:
entry = get_entry(row)
if(current_batch_size == batch_size):
execute_transaction(sql, parameter_set)
transaction_count = transaction_count + 1
print(f'Transaction count: {transaction_count}')
current_batch_size = 0
parameter_set.clear()
else:
parameter_set.append(entry)
current_batch_size = current_batch_size + 1
Насколько я понимаю, код ошибки 413
означает, что «Запросить объект тоже Large, что предполагает, что мне нужно уменьшить размер пакета, который я отправляю в БД (через DATA API), но я не могу понять, почему я могу отправить несколько пакетов из 1800 операторов SQL, прежде чем я начну получать указанную выше ошибку. . Какие-либо предложения?
Кроме того, каково было бы лучшее разрешение в моем случае, учитывая данные, которые мне нужно вставить / вставить в базу данных?