невозможно скопировать json - Dynamo db Streams в красное смещение

Ниже приведен вариант использования, над которым я работаю: я настроил enable Streams при создании DynamoDB с new and old Image. Я создал Kinesis Firehose delivery stream с пунктом назначения как Redshift(Intermediate s3).

Из Dynamodb мой поток достигает Firhose и оттуда в Bucket в формате JSON (S3 Bucket -Gzip), приведенном ниже. Моя проблема в том, что я cannot COPY this JSON to redshift.

Вещи, которые я не могу получить:

    1. Not Sure what should be the Create table Statement in Redshift
    1. What should be the COPY Syntax in Kinesis firhose.
    1. How should i use JsonPaths here. Kinesis Data firehouse set to return only json to my s3 bucket.
    1. How to mention the Maniphest in the COPY Command

Загрузка JSON в S3 показана ниже:

{
    "Keys": {
        "vehicle_id": {
            "S": "x011"
        }
    },
    "NewImage": {
        "heart_beat": {
            "N": "0"
        },
        "cdc_id": {
            "N": "456"
        },
        "latitude": {
            "N": "1.30951"
        },
        "not_deployed_counter": {
            "N": "1"
        },
        "reg_ind": {
            "N": "0"
        },
        "operator": {
            "S": "x"
        },
        "d_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "z_id": {
            "N": "1267"
        },
        "last_end_trip_dttm": {
            "S": "11/08/2018 1:43:46 PM"
        },
        "land_ind": {
            "N": "1"
        },
        "s_ind": {
            "N": "1"
        },
        "status_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "case_ind": {
            "N": "1"
        },
        "last_po_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "violated_duration": {
            "N": "20"
        },
        "vehicle_id": {
            "S": "x011"
        },
        "longitude": {
            "N": "103.7818"
        },
        "file_status": {
            "S": "Trip_Start"
        },
        "unhired_duration": {
            "N": "10"
        },
        "eo_lat": {
            "N": "1.2345"
        },
        "reply_eo_ind": {
            "N": "1"
        },
        "license_ind": {
            "N": "0"
        },
        "indiscriminately_parked_ind": {
            "N": "0"
        },
        "eo_lng": {
            "N": "102.8978"
        },
        "officer_id": {
            "S": "[email protected]"
        },
        "case_status": {
            "N": "0"
        },
        "color_status_cd": {
            "N": "0"
        },
        "parking_id": {
            "N": "2345"
        },
        "ttr_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "deployed_ind": {
            "N": "1"
        },
        "status": {
            "S": "PI"
        }
    },
    "SequenceNumber": "1200000000000956615967",
    "SizeBytes": 570,
    "ApproximateCreationDateTime": 1535513040,
    "eventName": "INSERT"
}

Мое заявление о создании таблицы:

create table vehicle_status(
    heart_beat integer,
    cdc_id integer,
    latitude integer,   
    not_deployed_counter integer,
    reg_ind integer,
    operator varchar(10),
    d_dttm varchar(30),
    z_id integer,
    last_end_trip_dttm varchar(30),
    land_ind integer,
    s_ind integer,
    status_change_dttm varchar(30), 
    case_ind integer,
    last_po_change_dttm varchar(30),    
    violated_duration integer,
    vehicle_id varchar(8),
    longitude integer,  
    file_status varchar(30),
    unhired_duration integer,
    eo_lat integer,                     
    reply_eo_ind integer,
    license_ind integer,    
    indiscriminately_parked_ind integer,
    eo_lng integer,
    officer_id varchar(50),
    case_status integer,
    color_status_cd integer,
    parking_id integer,
    ttr_dttm varchar(30),
    deployed_ind varchar(3),
  status varchar(8));

И мое заявление о копировании (вручную пытаюсь повторить это из Redshift):

COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) 
FROM 's3://<my-bucket>/2018/08/29/05/vehicle_status_change-2-2018-08-29-05-24-42-092c330b-e14a-4133-bf4a-5982f2e1f49e.gz' CREDENTIALS 'aws_iam_role=arn:aws:iam::<accountnum>:role/<RedshiftRole>' GZIP json 'auto';

Когда я пытаюсь выполнить описанную выше процедуру, я получаю команду «Вставить записи», но все столбцы и строки пусты.

Как я могу скопировать этот формат json в redhsift. Застряли здесь последние 3 дня. Любая помощь по этому поводу подойдет.

Ведро S3:

Amazon S3/<My-bucket>/2018/08/29/05
Amazon S3/<My-bucket>/manifests/2018/08/29/05

person Prasanna Nandakumar    schedule 29.08.2018    source источник


Ответы (1)


Я не очень хорошо знаком с Amazon, но позвольте мне попытаться ответить на большинство ваших вопросов, чтобы вы могли двигаться дальше. Другие люди могут редактировать этот ответ или дополнительные детали. Спасибо!

Не уверен, каким должно быть заявление о создании таблицы в Redshift

В операторе создания create table vehicle_status(...) нет проблем, хотя вы можете добавить distribution key, sort key и encoding в зависимости от ваших требований, см. подробнее здесь и здесь

Согласно AWS Kenesis документов, ваша таблица должна присутствовать в Redshift, поэтому вы можете подключиться к Redshift с помощью команды psql и запустить create statement вручную.

Каким должен быть синтаксис COPY в Kinesis firhose.

Синтаксис Copy останется прежним, если вы запустите его через psql или firhose, к счастью, сценарий копирования, который вы придумали, работает без ошибок, я пробовал его в своем экземпляре с небольшой модификацией прямого ключа AWS/SECRET, тогда он работает нормально, здесь sql, который я запускал, работал нормально и скопировал 1 запись данных в таблицу vehicle_status.

На самом деле у вас сложная структура пути json, поэтому json 'auto' не будет работать. Вот рабочая команда. Я создал для вас пример файла jsonpath с 4 примерами полей, и вы можете использовать ту же структуру для создания файла jsonpath со всеми точками данных.

 COPY vehicle_status (heart_beat, cdc_id, operator, status) FROM 's3://XXX/development/test_file.json' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://XXX/development/yourjsonpathfile';

И ваш json path file должен иметь содержимое, подобное приведенному ниже.

{
  "jsonpaths": [
    "$['NewImage']['heart_beat']['N']",
    "$['NewImage']['cdc_id']['N']",
    "$['NewImage']['operator']['S']",
    "$['NewImage']['status']['S']"
  ]
}

Я протестировал его, и он работает.

Как мне использовать JsonPaths здесь. Kinesis Data firehouse настроен на возврат только json в мою корзину s3.

Я использовал только данные вашего примера json, и он работает, поэтому я не вижу здесь проблемы.

Как указать манифест в команде COPY

Это хороший вопрос, я мог бы попытаться объяснить его, надеюсь, здесь вы имеете в виду menifest.

Если вы видите приведенную выше команду копирования, она отлично работает для одного или нескольких файлов, но думайте, что у вас много файлов, здесь появляется концепция menifest. Прямо из документов Amazon: «Вместо того, чтобы указывать путь к объекту для команды COPY, вы указываете имя текстового файла в формате JSON, в котором явно перечислены файлы, которые нужно загрузить».

Короче говоря, если вы хотите загрузить несколько файлов за один раз, что также предпочтительно для Redshift, вы можете создать простой menifest с json и указать то же самое в команде копирования.

{ "entries": [ {"url":"s3://mybucket-alpha/2013-10-04-custdata", "mandatory":true}, {"url":"s3://mybucket-alpha/2013-10-05-custdata", "mandatory":true},.... ] }

загрузите манифест в S3 и используйте его в своей команде копирования, как показано ниже.

 COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) FROM 's3://XXX/development/test.menifest' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://yourbucket/jsonpath' menifest;

Вот подробная ссылка на menifest.

Я надеюсь, что это даст вам некоторые идеи о том, как двигаться дальше, и если вы увидите конкретную ошибку, я был бы рад переориентироваться на ответ.

person Red Boy    schedule 31.08.2018
comment
все еще есть та же проблема. вставка выполнена успешно, но для всех столбцов вставляется Null. - person Prasanna Nandakumar; 04.09.2018
comment
@Ruser На самом деле, я забыл позаботиться о файле jsonpath, который я исправил, попробуйте с последней информацией, которую я добавил, она точно сработает, я провел быстрый тест. - person Red Boy; 04.09.2018
comment
Как мы генерируем jsonpath из json. Я могу один раз сделать maunally в создании jsonpath и написать команду копирования прямо в redshift. Однако я хочу, чтобы это было автоматизировано с помощью пожарного шланга Kinesis Data на s3 для красного смещения. - person Prasanna Nandakumar; 04.09.2018
comment
Я думаю, что обычно jsonpaths не меняется очень часто, поэтому я не уверен, поможет ли это в большей степени, на самом деле это может быть менее эффективно, потому что вы можете выполнять файл генерации jsonpath каждый раз. Извините, у меня не так много знаний об автоматической генерации jsonpath. - person Red Boy; 04.09.2018
comment
@RUUser Я надеюсь, что это решило вашу проблему, теперь вы должны видеть данные в Redshift. Имеет ли это? - person Red Boy; 05.09.2018