Распространенный вариант использования в жизненном цикле машинного обучения Data Engineering - доступ к последним обучающим данным, чтобы предотвратить ухудшение модели. Специалисты по обработке данных часто находят обременительным вручную экспортировать данные из таких источников, как реляционные базы данных, хранилища данных NoSQL или даже распределенные данные. Это требует автоматизации конвейера разработки данных в машинном обучении. В этом посте мы расскажем, как настроить этот конвейер для пакетных данных. Этот рабочий процесс управляется с помощью Airflow и может быть настроен для работы с регулярными интервалами: например, ежечасно, ежедневно, еженедельно и т. Д., В зависимости от конкретных бизнес-требований.

Краткое примечание - если вы заинтересованы в создании конвейера обработки данных в реальном времени для машинного обучения, прочтите этот пост

В этом случае мы собираемся экспортировать данные MongoDB в Google BigQuery через облачное хранилище. Обновленные данные в BigQuery затем становятся доступными в Jupyter Notebook в виде фрейма данных Pandas для последующего построения моделей и аналитики. Поскольку конвейер автоматизирует прием и предварительную обработку данных, специалисты по обработке данных всегда имеют доступ к последним пакетным данным в своих блокнотах Jupyter, размещенных на платформе Google AI.

У нас есть служба MongoDB, работающая в одном экземпляре, и у нас есть Airflow и mongoexport, работающие в докере в другом экземпляре. Mongoexport - это утилита, которая производит экспорт данных в формате JSON или CSV, хранящихся в MongoDB. Теперь данные в MongoDB должны быть извлечены и преобразованы с помощью mongoexport и загружены в CloudStorage. Airflow используется для планирования и управления этим экспортом. Как только данные будут доступны в CloudStorage, их можно будет запросить в BigQuery. Затем мы получаем эти данные из BigQuery в Jupyter Notebook. Ниже приводится пошаговая последовательность шагов по настройке этого конвейера данных.

Вы можете создать экземпляр в GCP, перейдя в Compute Engine. Нажмите создать экземпляр.

Install.sh:

sudo apt-get update
curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh
sudo usermod -aG docker $USER
sudo apt-get install -y python-pip
export AIRFLOW_HOME=~/airflow
sudo pip install apache-airflow
sudo pip install apache-airflow[postgres,s3]
airflow initdb
airflow webserver -p 8080 -D
airflow scheduler -D
sudo docker pull mongo
sudo docker run --name mongo_client -d mongo

Запустите файл install.sh с помощью команды ./install.sh (убедитесь, что файл является исполняемым), которая установит Docker, Airflow, загрузит образ Mongo и запустит изображение mongo в контейнере с именем mongo_client.

После установки для веб-интерфейса Airflowhttp: // ‹public-ip-instance›: 8080 (вам может потребоваться открыть порт 8080 в сети только для вашего общедоступного IP-адреса)

Убедитесь, что у сервисного аккаунта Google в запущенном экземпляре есть разрешения на доступ к Big Query и облачному хранилищу. После установки добавьте файл Python задания воздушного потока (mongo-export.py) в папку airflow / dags.

Перед запуском файла Python убедитесь, что вы создали набор данных и таблицу в BigQuery. Также измените соответствующие значения для исходной базы данных MongoDB, исходной таблицы MongoDB, целевого сегмента облачного хранилища и целевого набора данных BigQuery в файле Python задания Airflow (mongo-export.py). Имя целевой таблицы Big Query совпадает с именем исходной таблицы в Mongo DB.

Mongo-export.py:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import json
from pandas.io.json import json_normalize

# Following are default arguments which could be overridden
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

bucket_name = '<Your_Bucket>'
db_name = '<Database_Name>'
dataset = '<Dataset_Name>'
table_name = '<Table_Name>'


time_stamp = datetime.now()
cur_date = time_stamp.strftime("%Y-%m-%d")

# It will flatten the nested json
def flatten_json(y):
    out = {}
    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

def convert_string(y):
    string_type = {}

    def convert(x, name=''):
        if type(x) is dict:
            for a in x:
                convert(str(x[a]), name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            string_type[name[:-1]] = x

    convert(y)
    return string_type


def json_flat():
    lines = [line.rstrip('\n') for line in open('/home/dev/'+ table_name + '-unformat.json')]
    flat_list = []
    for line in lines:
        line = line.replace("\"$", "\"")
        line = json.loads(line)
        try:
            flat_list.append(json.dumps(convert_string(flatten_json(line))))
        except Exception as e:
            print(e)
    flatted_json = '\n'.join(i for i in flat_list)

    with open('/home/dev/' + table_name + '.json', 'a') as file:
        file.write(flatted_json)
    return flatted_json 

dag = DAG('mongoexport-daily-gcs-bq', default_args=default_args, params = {'cur_date': cur_date, 'db_name': db_name, 'table_name': table_name, 'dataset': dataset, 'bucket_name': bucket_name})
#exports provide a table data into docker container 
t1 = BashOperator(
    task_id='mongoexport_to_container',
    bash_command='sudo docker exec -i mongo_client sh -c "mongoexport --host=<mongo_instance_ip> --db {{params.db_name}} --collection {{params.table_name}} --out {{params.table_name}}-unformat.json"',
    dag=dag)

# copies exported file into instance

t2 = BashOperator(
    task_id='cp_from_container_instance',
    bash_command='sudo docker cp mongo_client:/{{params.table_name}}-unformat.json /home/dev/',
    dag=dag)

t3 = PythonOperator(
    task_id='flattening_json',
    python_callable=json_flat,
    dag=dag)
# copies the flatten data from cloud storage
t4 = BashOperator(
    task_id='cp_from_instance_gcs',
    bash_command='gsutil cp /home/dev/{{params.table_name}}.json gs://{{params.bucket_name}}/raw/{{params.table_name}}/date={{params.cur_date}}/',
    dag=dag)
# 
t5 = BashOperator(
    task_id='cp_from_instance_gcs_daily_data',
    bash_command='gsutil cp /home/dev/{{params.table_name}}.json gs://{{params.bucket_name}}/curated/{{params.table_name}}/',
    dag=dag)

# removes the existing bigquery table
t6 = BashOperator(
    task_id='remove_bq_table',
    bash_command='bq rm -f {{params.dataset}}.{{params.table_name}}',
    dag=dag)
# creates a table in bigquery
t7 = BashOperator(
    task_id='create_bq_table',
    bash_command='bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON {{params.dataset}}.{{params.table_name}} gs://{{params.bucket_name}}/curated/{{params.table_name}}/{{params.table_name}}.json',
    dag=dag)
# removes data from container
t8 = BashOperator(
    task_id='remove_file_from_container',
    bash_command='sudo docker exec -i mongo_client sh -c "rm -rf {{params.table_name}}*.json"',
    dag=dag)
# removes data from instance
t9 = BashOperator(
    task_id='remove_file_from_instance',
    bash_command='rm -rf /home/dev/{{params.table_name}}*.json',
    dag=dag)

t1 >> t2
t2 >> t3
t3 >> [t4, t5]
[t4, t5] >> t6
t6 >> t7
t7 >> [t8, t9]

Затем запустите файл python, используя python ‹file-path› .py.

(пример: python airflow / dags / mongo-export.py).

После запуска файла python имя тега отображается в веб-интерфейсе Airflow. И вы можете запустить даг вручную. Убедитесь, что кнопка переключения находится в состоянии ВКЛ.

После завершения задания данные сохраняются в корзине, а также доступны в целевой таблице в BigQuery. Вы могли видеть, что таблица создана в BigQuery. Нажмите таблицу запросов, чтобы выполнить операции SQL, и вы сможете увидеть свои результаты на вкладке предварительного просмотра внизу.

Теперь вы можете получить доступ к данным в Jupyter Notebook из BigQuery. Найдите ноутбук в консоли GCP.

Выполните следующие команды в Jupyter Notebook.

from google.cloud import bigquery
client = bigquery.Client()
sql = """
SELECT * FROM 
`<project-name>.<dataset-name>.<table-name>`
"""
df = client.query(sql).to_dataframe()
df.head(10)

Это загружает данные BigQuery в фреймворк Pandas и может использоваться для создания модели при необходимости. Позже, когда конвейер данных будет запущен по расписанию, обновленные данные будут автоматически доступны в записной книжке Jupyter через этот SQL-запрос.

Надеюсь, это поможет вам автоматизировать конвейер пакетной обработки данных для машинного обучения.

Соавторами этой истории являются Сантош и Суббаредди. Сантош - облачный инженер, а Суббаредди - инженер по большим данным.

Первоначально опубликовано на http://blog.zenof.ai 13 июля 2019 г.