Да, это возможно. К этому есть два подхода:
- Используйте
transformSQL
из RedShiftCopyActivity.
transformSQL
полезно, если преобразования выполняются в рамках записи, которая загружается своевременно, например каждый день или час. Таким образом, изменения применяются только к партии, а не ко всей таблице.
Вот выдержка из документации:
transformSql: выражение SQL SELECT, используемое для преобразования входных данных. Когда вы копируете данные из DynamoDB или Amazon S3, AWS Data Pipeline создает таблицу, называемую промежуточной, и изначально загружает ее туда. Данные из этой таблицы используются для обновления целевой таблицы. Если указан параметр transformSql, из указанного оператора SQL создается вторая промежуточная таблица. Данные из этой второй промежуточной таблицы затем обновляются в окончательной целевой таблице. Таким образом, transformSql необходимо запустить в таблице с именем staging, а схема вывода transformSql должна соответствовать схеме конечной целевой таблицы.
Пожалуйста, найдите ниже пример использования transformSql. Обратите внимание, что выберите из staging
таблицы. Он будет эффективно работать CREATE TEMPORARY TABLE staging2 AS SELECT <...> FROM staging;
. Кроме того, все поля должны быть включены и соответствовать существующей таблице в RedShift DB.
{
"id": "LoadUsersRedshiftCopyActivity",
"name": "Load Users",
"insertMode": "OVERWRITE_EXISTING",
"transformSql": "SELECT u.id, u.email, u.first_name, u.last_name, u.admin, u.guest, CONVERT_TIMEZONE('US/Pacific', cs.created_at_pst) AS created_at_pst, CONVERT_TIMEZONE('US/Pacific', cs.updated_at_pst) AS updated_at_pst FROM staging u;",
"type": "RedshiftCopyActivity",
"runsOn": {
"ref": "OregonEc2Resource"
},
"schedule": {
"ref": "HourlySchedule"
},
"input": {
"ref": "OregonUsersS3DataNode"
},
"output": {
"ref": "OregonUsersDashboardRedshiftDatabase"
},
"onSuccess": {
"ref": "LoadUsersSuccessSnsAlarm"
},
"onFail": {
"ref": "LoadUsersFailureSnsAlarm"
},
"dependsOn": {
"ref": "BewteenRegionsCopyActivity"
}
}
- Используйте
script
из SqlActivity.
SqlActivity позволяет выполнять операции со всем набором данных и может быть запланирован для запуска после определенных событий с помощью механизма dependsOn
.
{
"name": "Add location ID",
"id": "AddCardpoolLocationSqlActivity",
"type": "SqlActivity",
"script": "INSERT INTO locations (id) SELECT 100000 WHERE NOT EXISTS (SELECT * FROM locations WHERE id = 100000);",
"database": {
"ref": "DashboardRedshiftDatabase"
},
"schedule": {
"ref": "HourlySchedule"
},
"output": {
"ref": "LocationsDashboardRedshiftDatabase"
},
"runsOn": {
"ref": "OregonEc2Resource"
},
"dependsOn": {
"ref": "LoadLocationsRedshiftCopyActivity"
}
}
person
Vadym Tyemirov
schedule
18.11.2014