Настройка конвейера ETL с помощью нескольких команд

Чего не следует ожидать от этого блога? Управляемые решения ETL, такие как AWS Glue, AWS Data Migration Service или Apache Airflow. Облачные технологии управляются, но не бесплатны. И не рассматриваются в этой статье.

Оглавление

  1. Что такое конвейер ETL?
  2. Каковы различные варианты использования конвейера ETL?
  3. Предварительные требования для ETL - Docker + Debezium + Kafka + Kafka Connect - Вид с высоты птичьего полета
  4. Настройка ETL - четырехэтапный процесс

1: Что такое ETL?

ETL расшифровывается как Extract Transform Load pipeline. И он используется для создания хранилища данных или озера данных.

Примечание. Хранилище данных собирает несколько структурированных источников данных, таких как реляционные базы данных, но в озере данных мы храним как структурированные, так и неструктурированные данные.

2. Каковы различные варианты использования конвейера ETL?

ETL имеет следующие широкие варианты использования:

  • Придание структуры неструктурированным данным, потому что мы храним их в хранилище данных, которое обычно мы используем для хранения структурированных данных из нескольких ресурсов.
  • Конвейер данных для инженеров машинного обучения для получения данных для обучения моделей. И довольно часто это первая задача для инженера машинного обучения / специалиста по данным (L1).
  • Для создания резервной копии или промежуточного источника данных.

Примечание. В этом примере мы будем использовать источник как базу данных MySQL и место назначения как Elasticsearch, который по своей сути интегрирован с Kibana для визуализации данных и машинного обучения.

3: предварительные требования к ETL - Docker + Debezium + Kafka + Kafka Connect - вид с высоты птичьего полета

Как видно из рисунка выше, мы собираемся использовать следующее:

  • Docker. Система управления контейнерами (CMS). Мы используем Docker для простоты. Https://www.docker.com/
  • Debezium: Debezium - это не что иное, как система отслеживания измененных данных (CDC). Которая отслеживает каждое событие (вставка, обновление, удаление) из исходной БД и отправляет событие в Kafka с помощью Kafka Connect. Он использует журналы исходной базы данных для чтения каждой транзакции и создания события для конкретной транзакции.

Примечание. В случае MySQL мы назвали его binlog, а в случае PostgreSQL мы назвали его wal-logs (Write Ahead Log).

  • Kafka Connect: как следует из названия, он помогает Debezium соединяться с Kafka.
  • Kafka: Kafka помогает в потоковой передаче и обработке событий в режиме реального времени. Kafka работает с Zookeeper для отслеживания событий. Https://bit.ly/2Gb9Sm7
  • ELK (пункт назначения): мы рассматриваем Elasticsearch в качестве источника данных назначения, который по умолчанию также интегрирован с Kibana для визуализации данных и машинного обучения, это популярно и известно как Elasticsearch +. Logstash + Kibana (стек ELK) https://bit.ly/36dmioe

4. Приступим к настройке - четырехэтапный процесс

Шаг 1:. Изменение формата двоичного журнала MySQL, который нравится Debezium: просто перейдите в /etc/my.cnf, в основном в файл конфигурации MySQL, и добавьте следующие конфигурации:

Шаг 2: Запуск Zookeeper, Kafka и Kafka Connect с помощью Docker:

$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper 
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.0
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.0

Примечание. Если вы знакомы с Docker, вы можете использовать docker-compose.yaml. Вы можете найти его здесь: https://github.com/debezium/debezium-examples/blob/master/tutorial/

Шаг 3 (Извлечение). Мы будем использовать curl, чтобы отправить в нашу службу Kafka Connect сообщение запроса JSON, чтобы начать сбор событий из исходной БД с помощью Debezium под капотом (Это требуются учетные данные исходной БД ниже):

curl -i -X POST 
-H "Accept:application/json" 
-H "Content-Type:application/json" 
localhost:8083/connectors/ -d 
'{ "name": "etl-connector", 
"config": {                                                 "connector.class":      "io.debezium.connector.mysql.MySqlConnector", 
"tasks.max": "1", 
"database.hostname": "<mysql_host>", 
"database.port": "3306", 
"database.user": "<mysql_username>", 
"database.password": "<mysql_password>", 
"database.server.id": "184054", 
"database.server.name": "dbserver1", 
"database.whitelist": "<database_name>", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.<db_name>" } }'

Шаг 4 (преобразование и загрузка). Последний шаг - создание потребителя Kafka. Потребитель - это не что иное, как простая функция / код, который будет извлекать события Debezium, преобразовывать их и загружать в место назначения ELK.

Полный шаблон исходного кода можно найти здесь: https://github.com/burhanuddinbhopalwala/etl-elasticsearch-app

Это сделано! В этом примере мы используем массовую вставку для Elasticsearch. И вы можете увидеть журналы, как показано ниже, из исходного кода выше.

...
2017-09-21 07:38:48,385 INFO   MySQL|dbserver1|task  Kafka version : 0.11.0.0   [org.apache.kafka.common.utils.AppInfoParser]
2org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,402 INFO   MySQL|dbserver1|task  Successfully joined group inventory-connector-dbhistory with generation 1   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,403 INFO   MySQL|dbserver1|task  Setting newly assigned partitions [dbhistory.inventory-0] for group inventory-connect WorkerSourceTask{id=inventory-connector-0} finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSourceTask
INFO -- : CREATING MASTER DB CONNECTION
INFO -- : CONNECT ELASTICSEARCH
INFO -- : CONNECTED KAFKA
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 1, ID: 685475
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 2, ID: 457548
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 3, ID: 985484
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 4, ID: 258547
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 5, ID: 257544

Ошибки: если вы обнаружите какие-либо ошибки, вы всегда можете перейти на сайт Debezium: https://debezium.io/.

Наконец, спасибо за чтение. Надеюсь, этот блог окажется для вас полезным. И всегда не забывай дышать :)

Плейлист:

Большие данные и DevOps:

  • К публикации в области науки о данных - Amazon S3 Data Lake | Хранение и анализ потоковых данных на ходу | Бессерверный подход - https://bit.ly/397bfyF

Разработка программного обеспечения:

  • Сообщество разработчиков - 5 лучших практик использования GIT, которые должен знать каждый разработчик программного обеспечения - за 5 минут - https://bit.ly/3boXIEH