Поддерживает ли Amazon Kinesis Firehose программную поддержку преобразования данных?

У меня есть случай использования, в котором я должен убедиться, что полезные данные, отправленные в пожарный шланг Kinesis, действительно отправляются.

Для этого я придумал цепочку Firehose -> Firehose Data transform (using lambda) -> DDB -> Check for payload in DDB (payload is the hashkey in the DDB). Я должен программно определить всю эту цепочку одним выстрелом. Преобразование данных аналогично http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html.

Я делаю все это, так как не могу полностью контролировать имя файла в ведре S3, к которому он идет. Поэтому мне нужно, чтобы точная полезная нагрузка была отправлена ​​в какое-то постоянное хранилище значений ключа.

Проблема в

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/CreateDeliveryStreamRequest.html, похоже, не поддерживает добавление лямбда-выражения преобразования данных.

У меня вопрос: можно ли это сделать, даже не касаясь консоли (полностью через API AWS Kinesis Firehose).

Или есть альтернативные предложения по перемещению данных в DDB.


person Arunav Sanyal    schedule 27.04.2017    source источник


Ответы (2)


Итак, я понял это после долгих усилий и поиска документации.

Вам нужно будет определить конфигурацию обработки с лямбда-ARN, чтобы определить преобразование данных.

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/ProcessingConfiguration.html

Вот как такая конфигурация будет выглядеть в коде.

final ProcessingConfiguration processingConfiguration =
      new ProcessingConfiguration().withEnabled(true)
         .withProcessors(newProcessor().withType(ProcessorType.Lambda)
         .withParameters(new ProcessorParameter().withParameterName(LambdaArn)
         .withParameterValue(lamdbaFunctionArn)));

final CreateDeliveryStreamResult describeDeliveryStreamResult =
      client.createDeliveryStream(new CreateDeliveryStreamRequest().withExtendedS3DestinationConfiguration(
         new ExtendedS3DestinationConfiguration()
             .withBucketARN(s3BucketARN)
             .withRoleARN(roleArn)
             .withPrefix(keyPrefix)
             .withProcessingConfiguration(processingConfiguration))
             .withDeliveryStreamName(streamName));

Здесь ARN - это имена ресурсов для различных объектов (назначение S3, лямбда преобразования данных, роль IAM и т. Д.).

person Arunav Sanyal    schedule 30.04.2017

У вас может быть лямбда-функция для определения задачи. https://github.com/hixichen/golang_lamda_decode_protobuf_firehose

person xichen    schedule 25.07.2018