Как интегрировать автоматическую обработку CSV с приемом Quicksight через aws Lambda?

Я работал над проектом, который использует довольно простой конвейер данных для очистки и преобразования необработанных файлов csv в обработанные данные с использованием Python3.8 и Lambda для создания различных подмножеств, которые отправляются в соответствующие корзины S3. Функция Lambda запускается путем загрузки необработанного CSV-файла в приемную корзину S3, которая инициирует процесс.

Однако я хотел бы также отправить некоторые из этих обработанных данных непосредственно в Quicksight для приема из той же функции Lambda для визуальной проверки, и именно здесь я сейчас застрял.

Часть функции (без импорта) у меня есть только с обработкой CSV и загрузкой в ​​S3, и это часть, которая мне нравится напрямую загружать в Quicksight:

def featureengineering(event, context):
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    s3_file_name =  event['Records'][0]['s3']['object']['key']
    read_file = s3_client.get_object(Bucket=bucket_name,Key=s3_file_name)
   
    #turning the CSV into a dataframe in AWS Lambda
    s3_data = io.BytesIO(read_file.get('Body').read())
    df = pd.read_csv(s3_data, encoding="ISO-8859-1")

    #replacing erroneous zero values to nan (missing) which is more accurate and a general table,
    #and creating a new column with just three stages instead for simplification
    df[['Column_A','Column_B']] = df[['Column_A','Column_B']].replace(0,np.nan) 
    #applying function for feature engineering of 'newstage' function    
    df['NewColumn'] = df.Stage.apply(newstage) 
    
    df1 = df
    df1.to_csv(csv_buffer1)
    s3_resource.Object(bucket1, csv_file_1).put(Body=csv_buffer1.getvalue()) #downloading df1 to S3

Итак, в тот момент, когда df1 отправляется в его ведро S3 (что отлично работает), но я бы хотел, чтобы он напрямую загружался в Quicksight в качестве автоматического обновления специй.

Покопавшись, я нашел похожий вопрос с ответом

import boto3
import time
import sys
client = boto3.client('quicksight')
response = client.create_ingestion(DataSetId='<dataset-id>',IngestionId='<ingetion-id>',AwsAccountId='<aws-account-id>')

но зависание, которое у меня происходит, находится в DataSetId или в более общем плане, как мне превратить pandas DataFrame df1 в лямбда-функции во что-то, что API CreateIngestion может принять и автоматически отправить в QuickSight как автоматизированный приправить обновлением самых последних обработанных данных?


person JLuu    schedule 23.09.2020    source источник


Ответы (1)


Сначала вам следует создать набор данных Quicksight, цитируя из документы:

Набор данных определяет конкретные данные в источнике данных, которые вы хотите использовать. Например, источником данных может быть таблица, если вы подключаетесь к источнику данных базы данных. Это может быть файл, если вы подключаетесь к источнику данных Amazon S3.

После того, как вы сохранили свой DataFrame на S3 (как файл csv или parquet), вы можете создать набор данных Quicksight, который будет получать из него данные.

Вы можете сделать это через Console или программно (возможно, то, что вы ищете).

Наконец, получив идентификатор набора данных, вы можете ссылаться на него в других вызовах API Quicksight.

person Andre.IDK    schedule 23.09.2020
comment
Интересно. Значит, это нельзя сделать напрямую из самой обрабатывающей лямбда-функции? Нужно ли будет изменять / обновлять DataSetID с каждым новым обрабатываемым CSV? Если так, то это, казалось бы, лишает возможности полностью автоматизировать прием. - person JLuu; 24.09.2020
comment
Да, это можно сделать в той же лямбда-функции. Третья ссылка (программная) показывает, как это сделать с помощью AWS SDK. - person Andre.IDK; 24.09.2020
comment
Привет, @ Adnre.IDK, спасибо. Просто хотел уточнить, однако, в вашем исходном ответе и в документации по созданию набора данных Quicksight указано, что он будет извлекать его из S3, поэтому обрабатываемый DataFrame сначала должен быть загружен в S3 Bucket, но SDK, отправляющий его в QuickSight может по-прежнему находиться в лямбда-функции, выполняющей обработку? Я просто обдумываю шаги и время, когда что-то запускается и перемещается по конвейеру данных. Интересно, что DataFrame нельзя отправить напрямую из Lambda в QuickSight, как в S3. - person JLuu; 24.09.2020
comment
Чтобы понять, что такое набор данных, я бы посоветовал вам думать о нем как о ссылке на какие-то данные, а не как о фактическом DataFrame. Сам QuickSight не хранит данные, а использует объекты, называемые наборами данных, для отображения данных, на которые имеются ссылки. Это означает, что, по крайней мере, в вашем случае, чтобы сделать некоторые данные доступными для QuickSight, вам нужно будет прочитать данные, обработать их, сохранить на S3, создать набор данных QuickSight, который указывает на эти данные на S3, а затем, наконец, скажите QuickSight использовать его. Все это может быть в одной лямбде при условии, что в ней можно обрабатывать время выполнения и сложность. - person Andre.IDK; 24.09.2020
comment
это супер интересно и полезно знать. Спасибо! - person JLuu; 25.09.2020