Это вторая часть серии сообщений в блогах, показывающих, как мы развиваем архитектуру Ifood в команде профилей пользователей. Итак, рекомендую вам прочитать первый пост здесь. Это не обязательно, но так будет легче понять, откуда мы пришли и куда хотим идти. Если не хотите, я сделаю краткий обзор.

Краткое резюме

У нас есть микросервис, в котором хранятся метаданные клиентов Ifood (внутри мы называем это метаданными аккаунта), и в часы пик он достигает более 2 миллионов запросов в минуту. Эта система вызывается веб-приложениями и мобильными приложениями, а также многими внутренними командами для получения данных о клиентах. Данные хранятся в одной таблице DynamoDB (с 1,3 миллиардами элементов, использующих 757,3 ГБ).

Какие данные мы там храним? Особенности каждого пользователя Ifood. Некоторые примеры возможностей:
- счетчик суммы заказов
- топ-3 любимых блюд
- любимые рестораны
- в какой сегментации находится пользователь

Это была архитектура:

Главная проблема

Конвейер команды данных, чтобы экспортировать данные из озера данных, обработать их с помощью Databricks и Airflow, а затем, наконец, отправить их в метаданные учетной записи, состоитиз множества шаговимогуточень легко выйти из строя. Часто что-то выходит из строя, и у нас не всегда есть лучший мониторинг каждой отдельной детали. Это означает, что он был ненадежен. Это большая проблема для нас как команды по профилям пользователей, учитывая, что качество наших данных лежит в основе наших забот и интересов.

Это всегда было огромной болью, поэтому мы начали оглядываться и думать, как мы могли бы изменить эту часть инфраструктуры. Это означает изменить прием сотен миллионов записей в день.

Наш спаситель: Feature Store

Пока мы искали альтернативы и пытались понять, как заменить весь процесс загрузки, одна из команд машинного обучения в Ifood создавала новый замечательный инструмент: проект Feature Store (FS). Таким образом, Feature Store — это способ очень легко предоставлять данные и обмениваться ими, расширяя возможности приложений машинного обучения, обучения моделей и прогнозирования в реальном времени. С одной стороны, FS считывает данные откуда-то (озеро данных, хранилище данных, темы Kafka и т. д.), агрегирует их, выполняет какую-то обработку или расчет, а затем экспортирует результаты с другой стороны (API, в какую-то базу данных, Kafka темы и др.).

Когда мы услышали об этом, стало ясно, что это именно то, что нам нужно: централизованный, уникальный и очень организованный способ использования данных из озера данных. Несмотря на то, что мы будем использовать FS для чего-то, что на самом деле не связано с приложениями ML, это будет для нас очень подходящим вариантом использования. Нам бы это сильно облегчило задачу: они бы как-то экспортировали данные, а нам осталось бы сохранить их в нашей базе данных. Мы бы заменили сверхсложный и очень хрупкий конвейер надежным и надежным механизмом. Поговорив с командой Feature Store, мы решили, что они будут экспортировать функции нам через тему Kafka.

Однако FS не могла экспортировать функции партиями, а это означает, что каждому покупателю Ifood (около 60 миллионов) и каждой функции будет экспортировано сообщение. В то время у нас было около 20 функций, но мы уже планировали увеличить их до ~ 30 или 40. Это означает, что 60 миль * 20 = 1,2 миллиарда сообщений в день, но, вероятно, через несколько месяцев это число превысит 1,5 миллиарда.

Таким образом, мы должны иметь возможность потреблять около 1,5 млрд сообщений Kafka в день.

Использование данных из Feature Store

Как я уже сказал, FS будет экспортировать данные в топик Kafka. Мы определили схему следующим образом:

{
  account_id: string
  feature_name: string
  feature_value: string
  namespace: string
  timestamp: int
}

С этим мы могли бы создать потребителя, который будет слушать тему Kafka и сохранять функции в таблице DynamoDB.

В таблице Dynamo мы используем account_id в качестве ключа раздела и namespace в качестве ключа сортировки. Как следует из названия, пространство имен разделяет данные, предоставляемые системой метаданных учетной записи, по разным контекстам.

Вот так выглядит наша таблица:

| account_id | namespace | columns and its values… |
| 283a385e-8822–4e6e-a694-eafe62ea3efb | orders | total_orders: 3 | total_orders_lunch: 2 |
| c26d796a-38f9–481f-87eb-283e9254530f | rewards | segmentation: A |

Вот как будет выглядеть архитектура:

Потребитель читает тему Kafka и сохраняет данные в DynamoDB.

Мы сделали первую реализацию нашего потребителя с использованием Java. Он работает довольно хорошо, но очень далеко от того, что нам нужно: 4k функций, потребляемых в секунду на pod/потребителя. Мы пробовали некоторые настройки и разные конфигурации, но до 1.5 bi все еще было далеко.

После этого мы попробовали другую реализацию с Go, используя goka, высокоуровневую библиотеку Go для взаимодействия с Kafka. Это было намного лучше: 8,5 тыс. функций, потребляемых в секунду на pod/consumer. Тем не менее, это все еще довольно далеко от того, что нам нужно.

Наконец, по-прежнему используя Go, но с sarama, мы можем реализовать обработчик, потребляющий 1 миллион событий в минуту (20 000 функций, потребляемых в секунду на модуль/потребителя). Каждый модуль/потребитель создает три горутины для обработки сообщений, полученных от Kafka. Ура, мы сделали это! Это была третья попытка, поэтому мы научились правильно настраивать клиент Kafka, устанавливать правильный размер пакета данных для чтения и так далее.

На графике Grafana ниже вы можете увидеть количество сообщений, обрабатываемых с течением времени (разные цвета представляют разные пространства имен).

Обработка более 1 миллиона событий в минуту привела к большому количеству операций записи в DynamoDB. Сначала база данных сильно пострадала, поэтому нам пришлось масштабировать ее до 50 000 единиц записи в секунду.

Тестирование

После того, как потребитель был готов, мы должны были убедиться, что мы не теряем сообщения, не возникают проблемы с условиями гонки и что данные сохраняются правильно. Чтобы проверить это, мы добавили жестко заданный префикс к имени пространства имен в потребителе, чтобы сохранить, скажем, данные из пространства имен orders в пространство имен testing-orders. При этом мы написали пару скриптов для сканирования базы данных и сравнили значения между пространствами имен (заказы и заказы на тестирование), чтобы убедиться, что они одинаковы.

Делаем лучше и быстрее

Ну, у нас была первая версия. Он работал правильно, очень быстро потреблял события и правильно сохранял их в DynamoDB. Однако затраты на базу данных были действительно высоки, и этот первый подход был совершенно неэффективен, потому что мы делаем одну запись для каждого сообщения в Dynamo, хотя у нас было несколько сообщений от одного и того же account_id, довольно близких к теме Kafka, учитывая, что мы используя account_id в качестве ключевого раздела. Что-то вроде этого:

| account_id | feature_name | feature_value | namespace | timestamp|
| user1 | total_orders | 3 | orders | ts1 |
| user1 | total_orders_lunch | 1 | orders | ts3 |
| user1 | segmentation | foo | rewards | ts2 |
| user2 | segmentation | foo | rewards | ts2 |

Как видите, первая и вторая записи относятся к одному и тому же пользователю и к одному и тому же пространству имен. С account_id в качестве ключевого раздела события из одного и того же account_id будут потребляться одним и тем же потребителем. При этом мы могли бы создать своего рода буфер в памяти для агрегирования и объединения событий с одним и тем же идентификатором account_id и пространством имен и однократной записи их в Dynamo. Что мы и сделали.

Мы также изменили потребителя Kafka, чтобы он извлекал данные из топика в пакетном режиме, увеличив количество байтов «на выборку», что привело к увеличению количества сообщений. Вместо того, чтобы получать и обрабатывать по одному сообщению за раз, мы извлекаем 1000 сообщений, создаем карту, где ключ — account_id, а значение — список функций, и делаем одно сохранение в Dynamo для каждого account_id на карте. При этом мы сократили количество операций в Dynamo в 4 раза.

Одна важная деталь заключается в том, что при обработке одного сообщения за раз было легко пометить сообщение как «обработанное» и зафиксировать его. Однако при обработке тысячи из них не так-то просто пометить их как прочитанные и зафиксировать, если в процессе обработки пара сообщений из этого пакета выйдет из строя. Мы должны были быть более осторожными с частью кода.

Еще одна важная конфигурация — это то, как вы масштабируете новые модули, потому что, если вы не сделаете это очень хорошо, потребители могут потратить много времени на ребалансировку, и пока они это делают, они не будут потреблять события. Мы настроили автоматическое масштабирование Kubernetes для масштабирования новых модулей в зависимости от использования ЦП каждым модулем. Таким образом, как только модуль достигает 30% загрузки ЦП, он масштабируется на новый. Мы использовали низкий порог для быстрого масштабирования множества модулей, чтобы потребители не тратили много времени на перебалансировку.

Мониторинг

Чтобы контролировать рабочий процесс, мы потратили много времени, чтобы иметь очень хорошую наблюдаемость процесса. Мы использовали много Prometheus/Grafana для создания пользовательских метрик для потребителей и получения метрик подов; Datadog для сбора метрик из топика Kafka и создания дашбордов о лагах группы потребителей и сбора метрик кластера в целом; New Relic для сбора ошибок от потребителей и получения немного больше данных от потребителей.

Основные выводы

  • Скорее всего, с первой попытки вы не добьетесь наилучшего результата. Делайте заметки о том, что хорошо, а что нет, и проводите тщательное исследование, пытаясь найти, что может быть лучше.
  • Обработка такого количества данных может быть пугающей, но это не конец света. Это требует концентрации и внимания к деталям, но вы можете это сделать.
  • При обработке такого объема данных также трудно убедиться, что вы не теряете данные, не имеете проблем с состоянием гонки и так далее. Потратьте на это много времени.
  • Требуется некоторая настройка, чтобы сделать потребителей Kafka очень быстрыми. Мой главный совет — внимательно прочитать документацию и поэкспериментировать с параметрами. Основные из них — «fetch.min.bytes», «автоматическая фиксация» и «установка максимальных интервалов».