Запись паркета из пожарного шланга AWS Kinesis в AWS S3

Я хочу загрузить данные в s3 из пожарного шланга Kinesis, отформатированного как паркет. Пока я только что нашел решение, которое подразумевает создание EMR, но я ищу что-то более дешевое и быстрое, например, сохранить полученный json как паркет прямо из пожарного шланга или использовать функцию Lambda.

Большое спасибо, Хави.


person bracana    schedule 01.08.2017    source источник


Ответы (3)


Хорошие новости, эта функция была выпущена сегодня!

Amazon Kinesis Data Firehose может преобразовывать формат ваших входных данных из JSON в Apache Parquet или Apache ORC перед сохранением данных в Amazon S3. Parquet и ORC - это столбчатые форматы данных, которые экономят место и позволяют выполнять более быстрые запросы.

Чтобы включить, перейдите в поток Firehose и нажмите Edit. Вы должны увидеть раздел Преобразование формата записи, как на скриншоте ниже:

введите описание изображения здесь

Подробнее см. В документации: https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html.

person Vlad Holubiev    schedule 11.05.2018

Разобравшись со службой поддержки AWS и сотней различных реализаций, я хотел бы объяснить, чего я достиг.

Наконец, я создал лямбда-функцию, которая обрабатывает каждый файл, созданный Kinesis Firehose, классифицирует мои события в соответствии с полезной нагрузкой и сохраняет результат в файлах Parquet в S3.

Сделать это непросто:

  1. Прежде всего, вы должны создать виртуальную среду Python, включая все необходимые библиотеки (в моем случае Pandas, NumPy, Fastparquet и т. Д.). Поскольку полученный файл (который включает в себя все библиотеки и мою лямбда-функцию тяжелый, необходимо запустить экземпляр EC2, я использовал тот, который включен в бесплатный уровень). Чтобы создать виртуальную среду, выполните следующие действия:

    • Login in EC2
    • Создайте папку с именем лямбда (или с любым другим именем)
    • Sudo yum -y обновить
    • Sudo yum -y апгрейд
    • sudo yum -y groupinstall "Инструменты разработки"
    • sudo yum -y установить blas
    • sudo yum -y установить лапак
    • sudo yum -y установить atlas-sse3-devel
    • sudo yum установить python27-devel python27-pip gcc
    • Virtualenv env
    • исходный env / bin / активировать
    • pip install boto3
    • pip install fastparquet
    • pip install pandas
    • pip install thriftpy
    • pip install s3fs
    • pip install (любая другая необходимая библиотека)
    • найти ~ / lambda / env / lib * / python2.7 / site-packages / -name "* .so" | полоса xargs
    • pushd env / lib / python2.7 / сайт-пакеты /
    • zip -r -9 -q ~ / lambda.zip *
    • Popd
    • pushd env / lib64 / python2.7 / сайт-пакеты /
    • zip -r -9 -q ~ / lambda.zip *
    • Popd
  2. Правильно создайте lambda_function:

    import json
    import boto3
    import datetime as dt
    import urllib
    import zlib
    import s3fs
    from fastparquet import write
    import pandas as pd
    import numpy as np
    import time
    
    def _send_to_s3_parquet(df):
        s3_fs = s3fs.S3FileSystem()
        s3_fs_open = s3_fs.open
        # FIXME add something else to the key or it will overwrite the file 
        key = 'mybeautifullfile.parquet.gzip'
        # Include partitions! key1 and key2
        write( 'ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df,
                compression='GZIP',open_with=s3_fs_open)            
    
    def lambda_handler(event, context):
        # Get the object from the event and show its content type
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'])
        try:
            s3 = boto3.client('s3')
            response = s3.get_object(Bucket=bucket, Key=key)
            data = response['Body'].read()
            decoded = data.decode('utf-8')
            lines = decoded.split('\n')
            # Do anything you like with the dataframe (Here what I do is to classify them 
            # and write to different folders in S3 according to the values of
            # the columns that I want
            df = pd.DataFrame(lines)
            _send_to_s3_parquet(df)
        except Exception as e:
            print('Error getting object {} from bucket {}.'.format(key, bucket))
            raise e
    
  3. Скопируйте лямбда-функцию в lambda.zip и разверните lambda_function:

    • Go back to your EC2 instance and add the lambda function desired to the zip: zip -9 lambda.zip lambda_function.py (lambda_function.py is the file generated in the step 2)
    • Скопируйте сгенерированный zip-файл на S3, так как его очень сложно развернуть, не выполняя его через S3. aws s3 cp lambda.zip s3: // ведро поддержки / lambda_packages /
    • Разверните лямбда-функцию: aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages / lambda.zip
  4. Запустите функцию, которая будет выполняться, когда захотите, например, каждый раз, когда новый файл создается в S3, или даже вы можете связать лямбда-функцию с Firehose. (Я не выбирал этот вариант, потому что пределы 'лямбда' ниже, чем ограничения Firehose, вы можете настроить Firehose для записи файла каждые 128 МБ или 15 минут, но если вы свяжете эту лямбда-функцию с Firehose, лямбда-функция будет выполнена каждые 3 минуты или 5 МБ, в моем случае у меня была проблема с генерацией большого количества маленьких паркетных файлов, поскольку каждый раз, когда запускается лямбда-функция, я генерирую не менее 10 файлов).

person bracana    schedule 02.10.2017
comment
Правильно ли я понимаю, что этот конвейер создает один паркетный файл на запись? Паркет, будучи хранилищем в виде столбцов, тогда потребовалось бы какое-то отдельное задание по уплотнению, чтобы объединить эти крошечные паркетные файлы в одну большую? - person Tagar; 20.08.2018
comment
Привет @Tagar, он записывает паркетный файл каждый раз, когда вызывается lamba_handler, и это можно настроить, вы можете настроить его для запуска, например, каждые 15 минут, и это будет создавать файл каждые 15 минут со всеми событиями, полученными на этот раз. - person bracana; 27.08.2018

Amazon Kinesis Firehose получает записи потоковой передачи и может хранить их в Amazon S3 (или Amazon Redshift или Amazon Elasticsearch Service).

Каждая запись может иметь размер до 1000 КБ.

Kinesis flow

Однако записи добавляются вместе в текстовый файл с группировкой в ​​зависимости от времени или размера. Традиционно записи имеют формат JSON.

Вы не сможете отправить файл паркета, потому что он не соответствует этому формату файла.

Можно запустить функцию преобразования лямбда-данных, но она также не сможет выводить файл паркета.

Фактически, учитывая природу паркетных файлов, маловероятно, что вы сможете создавать их по одной записи за раз. Я подозреваю, что поскольку они представляют собой столбчатый формат хранения, их действительно нужно создавать в пакетном режиме, а не добавлять данные для каждой записи.

Итог: Нет.

person John Rotenstein    schedule 01.08.2017
comment
Большое спасибо, Джон. - person bracana; 01.08.2017
comment
Привет, @Javi, если этот или какой-либо ответ помог решить ваш вопрос, пожалуйста, примите его, нажав галочку. Это показывает широкому сообществу, что вы нашли решение, и дает некоторую репутацию как автору, так и вам. Это не обязательно. - person John Rotenstein; 01.08.2017
comment
@JohnRotenstein Не могли бы вы сделать лямбда-преобразование для каждого буферизованного пакета времени / размера из Firehose, а затем объединять файлы Parquet вместе до большего размера каждые несколько часов или около того? Это позволяет передавать данные в формате JSON в Parquet через Firehose, чтобы получать данные в Athena в режиме, близком к реальному времени, и при этом получать выгоду от Parquet. - person chris.mclennon; 16.01.2018
comment
@cmclen, Parquet - это столбчатый формат файла. Я не думаю, что вы могли бы просто добавлять по одной строке за раз - это лишило бы смысла использование Parquet. - person John Rotenstein; 16.01.2018
comment
@JohnRotenstein вы не могли (до 12 дней назад: см. Ответ Влада) полагаться на то, что Firehose сбрасывает преобразованные данные для вас в S3, но вы можете самостоятельно записывать файлы с помощью S3FS или тому подобное, как указала bracana. Вам просто нужно вернуть правильно отформатированные строки для Firehose, если вы хотите, чтобы они отображались как успешные (обычно просто добавьте метку времени processing_at и верните входные строки как есть). Также возможно сделать это напрямую в лямбде, если вы не полагаетесь на pandas, библиотека слишком велика, чтобы иметь возможность упаковать ее в лямбду (максимум 50 МБ). - person Bluu; 23.05.2018