Введение

В современных серверных ИТ-архитектурах почти необходимо работать с той или иной формой системы хранения. Это может быть внутреннее локальное файловое хранилище или нереляционная (или реляционная) база данных, которая установлена ​​на локальном сервере или (самое дорогое, но удобное решение) в облаке провайдерского сервиса типа Google или >Майкрософт.

Данные очень важны для приложений и часто управляются и «экспортируются» между базами данных из-за требований к программному обеспечению, которое необходимо приложению. Например, база данных Big Query — очень полезный инструмент, но, в зависимости от вашего использования, он стоит денег (большинство вещей в настоящее время стоят 😅), и если вы обнаружите, что постоянно используете его, вы можете столкнуться с некоторыми экономическими проблемами. в будущем. Но как нам использовать данные, если они уже есть в Big Query? Легкий!! Просто скопируйте его в другую базу данных, в которой нет проблем, связанных с предыдущей.

В этой статье мы сделаем именно это! Мы узнаем, как экспортировать таблицу БД Big Query в таблицу БД PSQL с помощью Python и Apache Airflow Архитектура DAG.

Предпосылки

  • Базовые знания архитектуры Apache Airflow Composer и DAG, а также Python.
  • Действительная учетная запись Google Cloud и определенная база данных Big Query.
  • Определенная база данных PostreSQL.

Общий рабочий процесс экспорта

Процедура экспорта, которую мы подробно рассмотрим ниже, имеет следующий рабочий процесс:

Этот рабочий процесс разделен на 2 задачи и добавляется в группу обеспечения доступности баз данных в указанном выше порядке.

Экспорт BQ в хранилище

Этот экспорт довольно прост, поскольку мы можем использовать оператор воздушного потока , который сделает всю тяжелую работу за нас. Чтобы экспортировать таблицу в GS,вы можете просто создать задачу, подобную следующей:

task_export_bq_data_to_storage = BigQueryToCloudStorageOperator(
    dag=dag,
    task_id='export_bq_data_to_storage',
    trigger_rule=TriggerRule.ONE_SUCCESS,
    source_project_dataset_table='INSERT BQ SOURCE TABLE HERE',
    destination_cloud_storage_uris=['gs://' + bucket_name + '/' + file_name],
    export_format='CSV',
    field_delimiter=csv_delimiter,
    print_header=True)

Что делать, если выходной файл слишком большой?

В настоящее время Google позволяет экспортировать в GS только файлы размером до 1 ГБ из таблиц BQ. Это означает, что если вы попытаетесь экспортировать большую таблицу, которая создаст файл размером более 1 Гб, вы получите сообщение об ошибке, указывающее именно на это.

Чтобы решить эту проблему, вам нужно разбить информацию, которую вы хотите экспортировать, на несколько файлов. К счастью, Google предоставляет простой способ сделать это. Просто добавьте * в конце имени файла Google Storage, и он создаст несколько файлов в GS. ведро. Эти файлы будут иметь следующее соглашение об именах: filename-000000000001,filename-000000000002…..

Для получения дополнительной информации по этому вопросу вы можете ознакомиться с официальной документацией Google здесь.

task_export_bq_data_to_storage = BigQueryToCloudStorageOperator(
    dag=dag,
    task_id='export_bq_data_to_storage_task',
    trigger_rule=TriggerRule.ONE_SUCCESS,
    source_project_dataset_table='INSERT BQ SOURCE TABLE HERE',
    destination_cloud_storage_uris=['gs://' + bucket_name + '/' + file_name + '-*'],
    export_format='CSV',
    field_delimiter=csv_delimiter,
    print_header=True)

ПРИМЕЧАНИЕ. Вы не можете контролировать размер этих файлов. Это означает, что если, например, выходной файл имеет размер 1,2 Гб, вы НЕ получите только 2 файла (1 с 1 Гб и 1 с 200 Мб). Google контролирует размеры сегментированных файлов, и они обычно довольно сжаты и «маленькие» около 100/200 МБ каждый.

После того, как эти файлы сгенерированы, разработчик решает, нужно ли ему объединить их в один или импортировать их один за другим. Если вам нужен первый, вы можете создать оператор Python, который сделает это за вас.

task_merge_gs_files = PythonOperator(
    task_id='merge_gs_file_task',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    python_callable=compose_file,
    op_args=[ "INSERT FIRST FUNCTION ARG", "INSERT SECOND FUNCTION ARG", ],
    provide_context=True,
    dag=dag)

Пример функции compose_file можно найти здесь. Вам придется адаптировать его в соответствии с вашими потребностями.

Прежде чем перейти к следующему этапу рабочего процесса, небольшой совет. Вы можете хранить эти файлы в GS корзине с очень ограниченным TTL, и таким образом вы не будете накапливать все больше и больше данных, которые вы использовать не буду так как (для данного случая использования) это просто переходная СХД. Дополнительную информацию о том, как установить TTL для сегмента, можно найти в официальной документации Google здесь.

Хранилище для экспорта в PSQL

Теперь, когда данные экспортированы в Google Storage, давайте экспортируем их в PSQL! Чтобы это произошло, мы определим функцию, которая подключается к PSQL с помощью соединителя psql Python и копирует всю информацию из файла CSV в конкретный >PSQL таблица.

Эта функция будет иметь следующее определение:

def fromStorage_toPostgres(table, main_query, additional_query, file, columns, pg_connection, **args):
    import urllib.request
    import psycopg2
    # connect to the PostgreSQL database
    conn_info = BaseHook.get_connection(pg_connection)

    conn = psycopg2.connect(host=conn_info.host,
                            database=conn_info.schema,
                            user=conn_info.login,
                            password=conn_info.password)

    cur = conn.cursor()

    if main_query:
        cur.execute(main_query)
        conn.commit()  # <--- makes sure the change is shown in the database

    with urllib.request.urlopen("https://storage.googleapis.com/{bucket}/{name_file}".format(bucket=bucket_name, name_file=file)) as file:
        cur.copy_from(file, table + "_tmp", sep=',', null='', columns=columns)
        conn.commit()
    file.close()

    cur.execute(''' DROP TABLE IF EXISTS {table}_old;
                    ALTER TABLE {table} RENAME TO {table}_old;
                    ALTER TABLE {table}_tmp RENAME TO {table};
                    DROP TABLE IF EXISTS {table}_old;
                   '''.format(table=table))

    conn.commit()  # <--- makes sure the change is shown in the database

    if additional_query:
        cur.execute(additional_query)
        conn.commit()

    conn.close()
    cur.close()

Где аргументы следующие:

  • table: имя целевой таблицы PSQL.
  • main_query: это будет скрипт create, который мы будем использовать для создания темпоральной таблицы, в которой мы будем хранить данные.
  • additional_query: необязательный запрос, в который мы можем добавить индексы, разрешения, fk и т. д.
  • файл: имя исходного файла, который мы хотим экспортировать из Google Storage.
  • столбцы: столбцы, которые вы хотите экспортировать из файла в базу данных psql. Обратите внимание, что количество столбцов и их типы должны соответствовать определению таблицы psql!
  • pg_connection. Переменная подключения, определенная в компоновщике Airflow, которая будет использоваться для подключения к базе данных. Для получения дополнительной информации о переменных подключения посетите страницу Airflow здесь.

Процесс копирования будет использовать вспомогательную таблицу (мы добавляем _tmp для ее указания) и заменит информацию из текущей производственной таблицы только в том случае, если вся информация была экспортирована правильно. На следующей диаграмме показано, как работает этот процесс:

Переменная main_query может выглядеть примерно так:

CREATE TABLE IF NOT EXISTS test_table_tmp
(
    test_field_1 text,
    test_field_2 text,
    test_field_3 text
);

Как только эта функция определена, мы можем построить оператор запроса Python следующим образом:

task_data_from_storage_to_postgres = PythonOperator(
    task_id='data_from_storage_to_postgres_task',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    python_callable=fromStorage_toPostgres,
    op_args=[ "table_name_HERE", MAIN_QUERY, SECONDARY_QUERY, file_name, ["test_field_1","test_field_2","test_field_3"], 'connection_variable_pg_HERE', ],
    provide_context=True,
    dag=dag)

Что делать, если в значениях CSV есть элементы, соответствующие используемому разделителю?

Это распространенная проблема. Если, например, у вас есть файл CSV, разделенный запятой, и столбцы со значениями, подобными следующему: "Мистер Джонсон, сын мистера Джонсона-старшего"или «Отель Сент-Джон, Сент-Мари, Санта-Барбара». Если вы попытаетесь использовать приведенную выше версию функции fromStorage_toPostgress, вы получите сообщение об ошибке. Чтобы избежать этих символов-разделителей (или просто чтобы они не мешали вашему разделению), вам нужно будет изменить следующую строку:

 cur.copy_from(file, table + "_tmp", sep=',', null='', columns=columns)

to

cur.copy_expert("""copy """ +table+"_tmp" + """ from stdin with (format csv, header, delimiter ',', quote '"')""", file)

Функция copy_expert, используемая во второй строке, в основном представляет собой более «профессиональную» версию функции copy_from. . Он может указывать на определенные характеристики файла, такие как разделитель, заголовки и т. д.

Собираем все вместе

Теперь, когда все функции определены, можно объявить рабочий процесс DAG. Используя имена, упомянутые выше, для самого простого случая, когда вам не нужно шардировать или что-то еще, рабочий процесс будет следующим:

import sys
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.hooks.base_hook import BaseHook

# Global constants #
dag_name = 'bq_to_psql_dag'
today_date = datetime.today()
yesterday_date = today_date - timedelta(days=1)
csv_delimiter = ','
bucket_name = 'INSERT_BUCKET_NAME_HERE'
file_name = 'INSERT_FILE_NAME_HERE.csv'

# Queries
MAIN_QUERY = '''
CREATE TABLE IF NOT EXISTS INSERT_TABLE_NAME_HERE_tmp
(
    test_field_1 text,
    test_field_2 text,
    test_field_3 text
);
'''

SECONDARY_QUERY = '''
CREATE INDEX idx_INSERT_TABLE_NAME_HERE_field_1
ON INSERT_TABLE_NAME_HERE(test_field_1);
'''

# ------------------- (0) -----------------------
# DAG data definition
default_args = {
    'owner': 'team-3',
    'start_date': datetime.combine(yesterday_date, datetime.min.time()),
    'retries': 0}

dag = DAG(
    dag_name,
    default_args=default_args,
    schedule_interval=None)

# ------------------- (1) -----------------------
# Function definitions
def fromStorage_toPostgres(table, main_query, additional_query, file, columns, pg_connection, **args):
    import urllib.request
    import psycopg2
    # connect to the PostgreSQL database
    conn_info = BaseHook.get_connection(pg_connection)

    conn = psycopg2.connect(host=conn_info.host,
                            database=conn_info.schema,
                            user=conn_info.login,
                            password=conn_info.password)

    cur = conn.cursor()


    if main_query:
        cur.execute(main_query)
        conn.commit()  # <--- makes sure the change is shown in the database

    with urllib.request.urlopen("https://storage.googleapis.com/{bucket}/{name_file}".format(bucket=bucket_name, name_file=file)) as file:
        cur.copy_from(file, table + "_tmp", sep=',', null='', columns=columns)
        conn.commit()
    file.close()

    cur.execute(''' DROP TABLE IF EXISTS {table}_old;
                    ALTER TABLE {table} RENAME TO {table}_old;
                    ALTER TABLE {table}_tmp RENAME TO {table};
                    DROP TABLE IF EXISTS {table}_old;
                   '''.format(table=table))

    conn.commit()  # <--- makes sure the change is shown in the database

    if additional_query:
        cur.execute(additional_query)
        conn.commit()

    conn.close()
    cur.close()

# ------------------- (2) -----------------------
# Operator definitions
task_export_bq_data_to_storage = BigQueryToCloudStorageOperator(
    dag=dag,
    task_id='export_bq_data_to_storage',
    trigger_rule=TriggerRule.ONE_SUCCESS,
    source_project_dataset_table='INSERT BQ SOURCE TABLE HERE',
    destination_cloud_storage_uris=['gs://' + bucket_name + '/' + file_name],
    export_format='CSV',
    field_delimiter=csv_delimiter,
    print_header=True)

task_data_from_storage_to_postgres = PythonOperator(
    task_id='data_from_storage_to_postgres_task',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    python_callable=fromStorage_toPostgres,
    op_args=[ "INSERT_TABLE_NAME_HERE", MAIN_QUERY, SECONDARY_QUERY, file_name, ["test_field_1","test_field_2","test_field_3"], 'connection_variable_pg_HERE', ],
    provide_context=True,
    dag=dag)

# ------------------- (3) -----------------------
# DAG workflow definition
task_export_bq_data_to_storage >> task_data_from_storage_to_postgres

Заключение

Несмотря на то, что это очень «ориентированное на реализацию» решение, оно демонстрирует простой способ, с помощью которого служба может экспортировать свои данные из одной системы баз данных в другую. Этот экспорт может иметь несколько применений, например, миграцию БД. Кроме того, это может быть очень полезно для упрощения данных. Вы можете использовать почти тот же код для экспорта результата запроса из Big Query в таблицу PSQL (а не всю таблицу Big Query).

В целом, я считаю, что это довольно простое решение проблемы, которая может немного разочаровать разработчиков, работающих с несколькими базами данных в своих системах.

Наконец, я хотел бы поблагодарить мою команду в TravelgateX, которая помогла мне разработать это решение, поскольку я относительно новичок в Big Query и Airflow! Дэвид, Марина, Камила, Джоан и Рауль, спасибо, ребята ❤️

Как всегда, не стесняйтесь обращаться ко мне, если вам нужна помощь с чем-то, что я упомянул, или у вас есть предложение о том, как я могу улучшить эту статью. Вы можете написать мне в мой твиттер в любое время.

Удачного кодирования :)