TL;DR

Я продемонстрировал алгоритм Наиболее часто встречающиеся элементы на стриме в Твиттере. Вы можете проверить исходный код проекта по адресу: https://github.com/efekaptan/top-k.

Введение

Поиск лучших элементов из конечного набора или живого потока — простой процесс. Но все становится сложнее, когда ваш расчет зависит от скользящего временного интервала. Хорошим примером такой проблемы может быть поиск «K самых частых слов из потока Twitter за последние 10 минут».

Можем ли мы найти алгоритм, обеспечивающий быструю вставку (O(1)), быстрое удаление (O(1)) и быстрый выбор (O(1)) в потоке данных? Сначала я решил использовать структуру данных Куча для быстрой вставки и выделения, но понял, что кучи не подходят для частых операций удаления. Поскольку каждое удаление кучи требует операции кучи, которая стоит O (logn).

Я провел небольшое исследование по этой теме, изучил пару алгоритмов потоков данных и нашел решение, которое может эффективно выполнять все операции. Я предполагаю, что скользящая часть данных может уместиться в памяти.

Алгоритм

Алгоритм достаточно простой, комбинации из двух связанных списков + одна хеш-таблица. Прежде чем углубляться в детали, давайте посмотрим на визуализацию алгоритма:

Каждый узел счета (красный цвет) имеет указатель головы, который указывает на узел слова. Слова с одинаковыми частотами создают двойной связанный список, и каждый узел также указывает на счетный узел. Всякий раз, когда мы получаем новое слово из потока, мы сначала проверяем хеш-таблицу, чтобы найти ссылку на узел. Затем узел перемещается к верхнему «счетному» узлу, что обеспечивает быструю вставку.

Операция удаления работает аналогичным образом. Слово приходит из потока «удаление» и перемещается в узел с меньшим количеством.

Выбор лучших K элементов прост. Начните с самого высокого узла подсчета частот, выберите головной узел и перейдите к следующему.

Выполнение

Я использовал Java для серверных служб и React для внешнего интерфейса. В качестве системы обмена сообщениями была выбрана Kafka.

Основными компонентами системы являются:

  • Tweet Exporter (который подписывается на поток Twitter и отправляет твиты Кафке)
  • Term Exporter (который получает твиты от Kafka и преобразует их в слова)
  • Frequency-api (который получает слова от Kafka и обслуживает частотный результат)
  • Фронтенд (который визуализирует результат)

Пара замечаний по реализации:

  • Я использовал официальный Twitter api для трансляции твитов. Поток был отфильтрован, чтобы получать только твиты на английском языке. Систему можно расширить, чтобы добавить еще одного экспортера (производителя) с другим языком.
  • Экспортер терминов выполняет базовую английскую токенизацию для каждого твита. Удаляет стоп-слова, URL-адреса, смайлики, «RT», упоминания и небуквенно-цифровые символы.
String tweet = "RT @gorillaz: Meanwhile, at Kong Studios... \uD83E";
List<String> expected = Arrays.asList("kong", "studios");
Assert.assertEquals(expected, getTokens(tweet));
  • Слова ставятся в очередь на две темы Кафки. Один для вставки и один для удаления. Каждое слово имеет значение Timestamp для проверки 10-минутного интервала перед удалением.
  • Каждый «частотный API» использует темы Kafka с другим groupId. Это позволяет нам поддерживать репликацию данных на каждом API для обеспечения высокой доступности.
  • Frontend опрашивает лучшие K слов каждые 10 секунд. Кроме того, использует поток слов с подключением WebSocket. Я уменьшил поток WebSocket %90, чтобы иметь возможность видеть слова в реальном времени (иначе слова скользят очень быстро).

Метрики

Некоторые показатели этой демонстрации:

  • Скорость экспортера твитов составляет 15 твитов в секунду (только на английском языке).
  • Скорость экспортера терминов 80 слов/сек.
  • В среднем в системе счетчиков (памяти API) существует 15 000 слов в течение 10-минутного интервала.

Вывод

Я попытался продемонстрировать простой подход к этой знаменитой проблеме «K наиболее часто встречающихся элементов». Следующие вопросы должны быть:

  • Что делать, если мои скользящие данные не помещаются в память?
  • Как рассчитать, если интервал больше 10 минут, например: последний час или последний день?
  • Как масштабировать, если поток данных содержит огромные объемы данных?

Ответы на эти вопросы — хороший кандидат на новую статью.

Вы можете ознакомиться с исходным кодом проекта по адресу: https://github.com/efekaptan/top-k.

Буду признателен, если вы поделитесь со мной своими мыслями об этой демонстрации. Ваши отзывы всегда приветствуются. Спасибо за чтение.