Проектирование системы аналитической обработки в реальном времени

Я разрабатываю систему, которая должна анализировать большое количество пользовательских транзакций и производить агрегированные показатели (такие как тенденции и т. д.). Система должна работать быстро, быть надежной и масштабируемой. Система основана на Java (на Linux).

Данные поступают из системы, которая генерирует файлы журналов (на основе CSV) пользовательских транзакций. Система генерирует файл каждую минуту, и каждый файл содержит транзакции разных пользователей (отсортированные по времени), каждый файл может содержать тысячи пользователей.

Пример структуры данных для CSV-файла:

10:30:01,пользователь 1,...
10:30:01,пользователь 1,...
10:30:02,пользователь 78,...
10:30:02 ,пользователь 2,...
10:30:03,пользователь 1,...
10:30:04,пользователь 2,...
. . .

Система, которую я планирую, должна обрабатывать файлы и выполнять некоторый анализ в режиме реального времени. Он должен собирать входные данные, отправлять их нескольким алгоритмам и другим системам и сохранять вычисленные результаты в базе данных. База данных не содержит фактических входных записей, а только высокоуровневый агрегированный анализ транзакций. Например, тренды и т. д.

Первый алгоритм, который я планирую использовать, требует для лучшей работы не менее 10 пользовательских записей, если он не может найти 10 записей через 5 минут, он должен использовать все доступные данные.

Я хотел бы использовать Storm для реализации, но я бы предпочел оставить это обсуждение на уровне дизайна, насколько это возможно.

Список компонентов системы:

  1. Задача, которая отслеживает входящие файлы каждую минуту.

  2. Задача, которая читает файл, анализирует его и делает доступным для других системных компонентов и алгоритмов.

  3. Компонент для буферизации 10 записей для пользователя (не более 5 минут), когда собрано 10 записей или прошло 5 минут, пора отправлять данные в алгоритм для дальнейшей обработки. Поскольку требование состоит в том, чтобы предоставить как минимум 10 записей для алгоритма, я подумал об использовании Storm Field Grouping (что означает, что одна и та же задача вызывается для одного и того же пользователя) и отслеживать сбор 10 пользовательских записей внутри задачи, конечно, я планирую чтобы иметь несколько таких задач, каждая обрабатывает часть пользователей.

  4. Есть и другие компоненты, которые работают с одной транзакцией, для них я планирую создать другие задачи, которые получают каждую транзакцию по мере ее разбора (параллельно с другими задачами).

Мне нужна ваша помощь с № 3.

Каковы наилучшие методы разработки такого компонента? Очевидно, что необходимо хранить данные по 10 записям для каждого пользователя. Карта ключ-значение может помочь. Лучше ли управлять картой в самой задаче или использовать распределенный кеш? Например, Redis — хранилище значений ключей (я никогда раньше им не пользовался).

Спасибо за вашу помощь


person user1550706    schedule 25.07.2012    source источник


Ответы (2)


Я немного работал с Redis. Итак, я прокомментирую вашу мысль об использовании Redis

№ 3 имеет 3 требования

  1. Буфер на пользователя

  2. Буфер на 10 задач

  3. Должен истекать каждые 5 минут

<сильный>1. Буфер на пользователя: Redis — это просто хранилище ключевых значений. Хотя он поддерживает широкий спектр типов данных, они всегда являются значениями, сопоставленными с ключом STRING. Итак, вы должны решить, как однозначно идентифицировать пользователя, если вам нужен буфер для каждого пользователя. Потому что в Redis вы никогда не получите ошибку при переопределении нового значения ключа. Одним из решений может быть проверка существования перед записью.

<сильный>2. Буфер на 10 задач: очевидно, что вы можете реализовать очередь в Redis. Но ограничение его размера остается за вами. Пример: использование LPUSH и LTRIM или использование LLEN для проверки длины и принятия решения о запуске вашего процесса. Ключ, связанный с этой очередью, должен быть тем, который вы выбрали в части 1.

<сильный>3. Срок действия буфера истекает через 5 минут: это самая сложная задача. В Redis каждый ключ, независимо от базового типа данных, который он имеет, может иметь expiry. Но процесс истечения молчит. Вы не будете получать уведомления об истечении срока действия любого ключа. Таким образом, вы незаметно потеряете свой буфер, если воспользуетесь этим свойством. Одним из способов решения этой проблемы является наличие index. Это означает, что индекс сопоставит метку времени с ключами, срок действия которых должен истечь при этом значении метки времени. Затем в фоновом режиме вы можете каждую минуту читать индекс и вручную удалять ключ [после чтения] из Redis и вызывать нужный процесс с данными буфера. Чтобы иметь такой индекс, вы можете посмотреть отсортированные наборы. Где временная метка будет вашей score, а набор member будет ключами [уникальный ключ для каждого пользователя, определенный в части 1, который сопоставляется с очередью], который вы хотите удалить с этой временной меткой. Вы можете сделать zrangebyscore, чтобы прочитать все элементы набора с указанной отметкой времени

В целом:

Используйте Redis List для реализации очереди.

Используйте LLEN, чтобы убедиться, что вы не превышаете лимит в 10.

Всякий раз, когда вы создаете новый список, делайте запись в индексе [Sorted Set] с Score как Current Timestamp + 5 min и Value в качестве ключа списка.

Когда LLEN достигнет 10, не забудьте прочитать, а затем удалить ключ из индекса [отсортированный набор] и из базы данных [удалить ключ-> список]. Затем запустите процесс с данными.

Каждую минуту создавайте текущую временную метку, читайте индекс и для каждого ключа считывайте данные, затем удаляйте ключ из базы данных и запускайте свой процесс.

Это может быть мой способ реализовать это. Может быть какой-то другой лучший способ смоделировать ваши данные в Redis

person Tamil    schedule 25.07.2012

Для ваших требований 1 и 2: [Apache Flume или Kafka]

Для вашего требования № 3: [Эспер Болт внутри Шторма. В Redis для этого вам придется переписать логику Эспера.]

person Yavar    schedule 22.07.2013