Настройка конвейера ETL с помощью нескольких команд
Чего не следует ожидать от этого блога? Управляемые решения ETL, такие как AWS Glue, AWS Data Migration Service или Apache Airflow. Облачные технологии управляются, но не бесплатны. И не рассматриваются в этой статье.
Оглавление
- Что такое конвейер ETL?
- Каковы различные варианты использования конвейера ETL?
- Предварительные требования для ETL - Docker + Debezium + Kafka + Kafka Connect - Вид с высоты птичьего полета
- Настройка ETL - четырехэтапный процесс
1: Что такое ETL?
ETL расшифровывается как Extract Transform Load pipeline. И он используется для создания хранилища данных или озера данных.
Примечание. Хранилище данных собирает несколько структурированных источников данных, таких как реляционные базы данных, но в озере данных мы храним как структурированные, так и неструктурированные данные.
2. Каковы различные варианты использования конвейера ETL?
ETL имеет следующие широкие варианты использования:
- Придание структуры неструктурированным данным, потому что мы храним их в хранилище данных, которое обычно мы используем для хранения структурированных данных из нескольких ресурсов.
- Конвейер данных для инженеров машинного обучения для получения данных для обучения моделей. И довольно часто это первая задача для инженера машинного обучения / специалиста по данным (L1).
- Для создания резервной копии или промежуточного источника данных.
Примечание. В этом примере мы будем использовать источник как базу данных MySQL и место назначения как Elasticsearch, который по своей сути интегрирован с Kibana для визуализации данных и машинного обучения.
3: предварительные требования к ETL - Docker + Debezium + Kafka + Kafka Connect - вид с высоты птичьего полета
Как видно из рисунка выше, мы собираемся использовать следующее:
- Docker. Система управления контейнерами (CMS). Мы используем Docker для простоты. Https://www.docker.com/
- Debezium: Debezium - это не что иное, как система отслеживания измененных данных (CDC). Которая отслеживает каждое событие (вставка, обновление, удаление) из исходной БД и отправляет событие в Kafka с помощью Kafka Connect. Он использует журналы исходной базы данных для чтения каждой транзакции и создания события для конкретной транзакции.
Примечание. В случае MySQL мы назвали его binlog, а в случае PostgreSQL мы назвали его wal-logs (Write Ahead Log).
- Kafka Connect: как следует из названия, он помогает Debezium соединяться с Kafka.
- Kafka: Kafka помогает в потоковой передаче и обработке событий в режиме реального времени. Kafka работает с Zookeeper для отслеживания событий. Https://bit.ly/2Gb9Sm7
- ELK (пункт назначения): мы рассматриваем Elasticsearch в качестве источника данных назначения, который по умолчанию также интегрирован с Kibana для визуализации данных и машинного обучения, это популярно и известно как Elasticsearch +. Logstash + Kibana (стек ELK) https://bit.ly/36dmioe
4. Приступим к настройке - четырехэтапный процесс
Шаг 1:. Изменение формата двоичного журнала MySQL, который нравится Debezium: просто перейдите в /etc/my.cnf, в основном в файл конфигурации MySQL, и добавьте следующие конфигурации:
Шаг 2: Запуск Zookeeper, Kafka и Kafka Connect с помощью Docker:
$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.0
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.0
Примечание. Если вы знакомы с Docker, вы можете использовать docker-compose.yaml. Вы можете найти его здесь: https://github.com/debezium/debezium-examples/blob/master/tutorial/
Шаг 3 (Извлечение). Мы будем использовать curl
, чтобы отправить в нашу службу Kafka Connect сообщение запроса JSON, чтобы начать сбор событий из исходной БД с помощью Debezium под капотом (Это требуются учетные данные исходной БД ниже):
curl -i -X POST
-H "Accept:application/json"
-H "Content-Type:application/json"
localhost:8083/connectors/ -d
'{ "name": "etl-connector",
"config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "<mysql_host>",
"database.port": "3306",
"database.user": "<mysql_username>",
"database.password": "<mysql_password>",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "<database_name>", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.<db_name>" } }'
Шаг 4 (преобразование и загрузка). Последний шаг - создание потребителя Kafka. Потребитель - это не что иное, как простая функция / код, который будет извлекать события Debezium, преобразовывать их и загружать в место назначения ELK.
Полный шаблон исходного кода можно найти здесь: https://github.com/burhanuddinbhopalwala/etl-elasticsearch-app
Это сделано! В этом примере мы используем массовую вставку для Elasticsearch. И вы можете увидеть журналы, как показано ниже, из исходного кода выше.
... 2017-09-21 07:38:48,385 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser] 2org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:38:48,402 INFO MySQL|dbserver1|task Successfully joined group inventory-connector-dbhistory with generation 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 2017-09-21 07:38:48,403 INFO MySQL|dbserver1|task Setting newly assigned partitions [dbhistory.inventory-0] for group inventory-connect WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask INFO -- : CREATING MASTER DB CONNECTION INFO -- : CONNECT ELASTICSEARCH INFO -- : CONNECTED KAFKA INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 1, ID: 685475 INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 2, ID: 457548 INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 3, ID: 985484 INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 4, ID: 258547 INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 5, ID: 257544
Ошибки: если вы обнаружите какие-либо ошибки, вы всегда можете перейти на сайт Debezium: https://debezium.io/.
Наконец, спасибо за чтение. Надеюсь, этот блог окажется для вас полезным. И всегда не забывай дышать :)
Плейлист:
Большие данные и DevOps:
- К публикации в области науки о данных - Amazon S3 Data Lake | Хранение и анализ потоковых данных на ходу | Бессерверный подход - https://bit.ly/397bfyF
Разработка программного обеспечения:
- Сообщество разработчиков - 5 лучших практик использования GIT, которые должен знать каждый разработчик программного обеспечения - за 5 минут - https://bit.ly/3boXIEH