AWS Data Pipeline: загрузка CSV-файла из S3 в DynamoDB

Я пытаюсь перенести данные CSV из S3 в DynamoDB с помощью Data Pipeline. Данные находятся не в формате экспорта DynamoDB, а в обычном CSV.

Я понимаю, что Data Pipeline чаще используется для импорта или экспорта формата DynamoDB, а не для стандартного CSV. Я думаю, что прочитал в моем поиске в Google, что можно использовать обычный файл, но мне не удалось собрать что-то, что работает. Документация AWS тоже не очень помогла. Мне не удалось найти справочные сообщения, которые были относительно недавними (‹2 года назад)

Если это возможно, может ли кто-нибудь дать некоторое представление о том, почему мой конвейер может не работать? Я вставил конвейер и сообщение об ошибке ниже. Ошибка, кажется, указывает на проблему с подключением данных к Dynamo, я предполагаю, потому что это не в формате экспорта.

Я бы сделал это в Lambda, но загрузка данных занимает больше 15 минут.

Спасибо

{
  "objects": [
    {
      "myComment": "Activity used to run the hive script to import CSV data",
      "output": {
        "ref": "dynamoDataTable"
      },
      "input": {
        "ref": "s3csv"
      },
      "name": "S3toDynamoLoader",
      "hiveScript": "DROP TABLE IF EXISTS tempHiveTable;\n\nDROP TABLE IF EXISTS s3TempTable;\n\nCREATE EXTERNAL TABLE tempHiveTable (#{myDDBColDef}) \nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{myDDBTableName}\", \"dynamodb.column.mapping\" = \"#{myDDBTableColMapping}\");\n                    \nCREATE EXTERNAL TABLE s3TempTable (#{myS3ColDef}) \nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\n' LOCATION '#{myInputS3Loc}';\n                    \nINSERT OVERWRITE TABLE tempHiveTable SELECT * FROM s3TempTable;",
      "id": "S3toDynamoLoader",
      "runsOn": { "ref": "EmrCluster" },
      "stage": "false",
      "type": "HiveActivity"
    },
    {
      "myComment": "The DynamoDB table that we are uploading to",
      "name": "DynamoDB",
      "id": "dynamoDataTable",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}",
      "writeThroughputPercent": "1.0",
      "dataFormat": {
        "ref": "DDBTableFormat"
      }
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{myLogUri}",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "name": "EmrCluster",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "masterInstanceType": "m1.medium",
      "releaseLabel": "emr-5.29.0",
      "id": "EmrCluster",
      "type": "EmrCluster",
      "terminateAfter": "2 Hours"
    },
    {
      "myComment": "The S3 file that contains the data we're importing",
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "csvFormat"
      },
      "name": "S3DataNode",
      "id": "s3csv",
      "type": "S3DataNode"
    },
    {
      "myComment": "Format for the S3 Path",
      "name": "S3ExportFormat",
      "column": "not_used STRING",
      "id": "csvFormat",
      "type": "CSV"
    },
    {
      "myComment": "Format for the DynamoDB table",
      "name": "DDBTableFormat",
      "id": "DDBTableFormat",
      "column": "not_used STRING",
      "type": "DynamoDBExportDataFormat"
    }
  ],
  "parameters": [
    {
      "description": "S3 Column Mappings",
      "id": "myS3ColDef",
      "default": "phoneNumber string,firstName string,lastName string, spend double",
      "type": "String"
    },
    {
      "description": "DynamoDB Column Mappings",
      "id": "myDDBColDef",
      "default": "phoneNumber String,firstName String,lastName String, spend double",
      "type": "String"
    },
    {
      "description": "Input S3 foder",
      "id": "myInputS3Loc",
      "default": "s3://POCproject-dev1-data/upload/",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "DynamoDB table name",
      "id": "myDDBTableName",
      "default": "POCproject-pipeline-data",
      "type": "String"
    },
    {
      "description": "S3 to DynamoDB Column Mapping",
      "id": "myDDBTableColMapping",
      "default": "phoneNumber:phoneNumber,firstName:firstName,lastName:lastName,spend:spend",
      "type": "String"
    },
    {
      "description": "DataPipeline Log Uri",
      "id": "myLogUri",
      "default": "s3://POCproject-dev1-data/",
      "type": "AWS::S3::ObjectKey"
    }
  ]
}

Ошибка

[INFO] (TaskRunnerService-df-09432511OLZUA8VN0NLE_@EmrCluster_2020-03-06T02:52:47-0) df-09432511OLZUA8VN0NLE amazonaws.datapipeline.taskrunner.LogMessageUtil: Returning tail errorMsg :Caused by: java.lang.RuntimeException: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: One or more parameter values were invalid: An AttributeValue may not contain an empty string (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: UM56KGVOU511P6LS7LP1N0Q4HRVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.handleException(DynamoDBFibonacciRetryer.java:108)
    at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:83)
    at org.apache.hadoop.dynamodb.DynamoDBClient.writeBatch(DynamoDBClient.java:258)
    at org.apache.hadoop.dynamodb.DynamoDBClient.putBatch(DynamoDBClient.java:215)
    at org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.write(AbstractDynamoDBRecordWriter.java:112)
    at org.apache.hadoop.hive.dynamodb.write.HiveDynamoDBRecordWriter.write(HiveDynamoDBRecordWriter.java:42)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:762)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
    at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130)
    at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:148)
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
    ... 18 more
Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: One or more parameter values were invalid: An AttributeValue may not contain an empty string (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: UM56KGVOU511P6LS7LP1N0Q4HRVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)

person Mike S.    schedule 11.03.2020    source источник


Ответы (1)


Вы уже пробовали этот образец? Он использует hive для импорта файла CSV в таблицу DynamoDB https://github.com/aws-samples/data-pipeline-samples/tree/master/samples/DynamoDBImportCSV

person jmp    schedule 12.03.2020
comment
Я основал свой сценарий на этом, у меня никогда не получалось работать без ошибок. Я не уверен, где я ошибся. - person Mike S.; 12.03.2020