Простая система сообщений pub-sub на Java с использованием Apache Camel и Kafka

Привет ребята! Сегодня я хочу поговорить о создании и использовании сообщений с помощью Java, Spring, Apache Camel и Kafka. Многие приложения сегодня используют потоковую передачу событий и системы публикации сообщений для связи друг с другом. Одна из последних, которые я использовал, - это Apache Kafka, распределенная потоковая платформа, которая в основном упрощает публикацию и подписку на темы и обеспечивает высокую производительность за счет распараллеливания потребителей. Эта статья предназначена для всех, кто хочет использовать Kafka и, возможно, будет выполнять простое базовое использование, используя абстракцию, предоставляемую другим фреймворком: Apache Camel.

Apache Camel - это инфраструктура корпоративной интеграции (я люблю называть ее швейцарским ножом интеграции), которая включает в себя сотни готовых к использованию компонентов для интеграции с библиотеками, фреймворками и методами, известными на предприятии. промышленность и в мире открытого исходного кода.

Цель

В моем последнем проекте я использовал Apache Camel, чтобы придать проекту необходимую гибкость с маршрутами, и при этом я также использовал абстракцию, которую Camel предлагает для работы с темами Kafka. Наша цель состояла в том, чтобы предоставить нашим проектам микросервисов способ создания и использования сообщений из каждой темы Kafka, не беспокоясь о природе сообщения и используя модель абстракции, которая инкапсулирует все возможные сообщения, которыми мы могли бы обмениваться.

Модель - MessageWrapper

Прежде всего, мы начали определять единую модель с целью абстрагироваться от каждого сообщения, которым мы будем обмениваться. Имея это в виду, мы определили следующий класс:

В модели MessageWrapper у нас есть следующие поля:

  • timestamp: сохраняет, когда сообщение было опубликовано в теме Kafka.
  • callerModule: содержит информацию о том, кто был издателем сообщения (при необходимости).
  • messageType: настраиваемое поле для определения типа сообщений в терминах классов Java, которыми мы хотели бы обмениваться с темами Kafka.
  • payload: строка с представлением JSON, инкапсулирующая реальное сообщение, которым обмениваются.

И затем у нас есть перечисление MessageType, определенное следующим образом:

Посредством этого перечисления мы говорим, что можем обмениваться двумя типами сообщений: простыми строками и элементами (более сложными объектами). Чтобы дать вам представление, класс Item может быть следующим:

ПродюсерСервис

Теперь нам нужно центральное место для инкапсуляции всех сообщений, идущих к теме Kafka, в MessageWrapper, и это место - ProducerService class. Мы назвали «Продюсер», потому что он вызывается только при отправке сообщения Kafka, а не при его использовании. Класс ProducerService реализован следующим образом:

Когда я закончил с моделями, я собираюсь объяснить вам часть этого урока. По сути, центральным компонентом является ProducerTemplate фреймворка Camel: это общий компонент, выполняющий функцию публикации и отправки объекта (который мы также называем полезной нагрузкой) в конкретную конечную точку; в нашем случае конечная точка - это точка, указывающая тему Kafka.

В классе ProducerService у нас есть два метода sendBody: самый простой (строки 47–49) отправляет полезные данные непосредственно в конечную точку Camel; вместо этого другой метод sendBody (строки 35–40) полезен нам для обмена сообщениями в стандартной форме и выполняет следующие шаги:

  • берет полезную нагрузку и преобразует ее в формат JSON
  • используйте метод encapsulateMessage для создания объекта MessageWrapper
  • отправить объект MessageWrapper в конечную точку Camel относительно темы Kafka

Следовательно, с помощью этого класса мы можем взять любой тип сообщения, инкапсулировать в объект MessageWrapper и опубликовать его в теме Kafka.

Сериализация и десериализация сообщений

При создании и использовании сообщения в теме Kafka у нас есть возможность указать настраиваемый сериализатор, а также настраиваемый десериализатор. Мы стремимся обмениваться сообщениями взаимозаменяемым и уникальным способом, поэтому мы используем пользовательские компоненты для сериализации и десериализации.

Затем мы реализовали сериализатор, класс MessageWrapperSerializer, который несет единственную ответственность за преобразование MessageWrapper в строку JSON с помощью метода serialize следующим образом:

Затем мы реализовали соответствующий десериализатор, класс MessageWrapperDeserializer, чтобы взять JSON, который используется из темы Kafka, преобразовать его в объект MessageWrapper и получить полезную нагрузку, которая нас интересует, с помощью десериализовать следующим образом:

Что нам остается делать со всем этим классом? Конфигурации и запуск примера.

Конфигурации среды

Чтобы ускорить настройку среды, я использовал Docker со следующими настройками docker-compose.yml:

С такими конфигурациями довольно быстро настроить среду и заставить Zookeeper и Kafka работать на вас. Примечание: для запуска среды с терминала войдите в папку проекта и дайте команду:

docker-compose up

Для всех остальных конфигураций, таких как зависимости Maven, или для полного кода этой статьи я оставляю ссылку на мой личный репозиторий Github для проекта: https://github.com/dariux2016/template-projects/tree/master/camel -кафка-производитель

Конфигурации Kafka

В дополнение к среде на уровне приложения мы должны настроить параметры для правильного взаимодействия с Kafka. Затем мы создали следующий application.yml:

В этой конфигурации мы установили некоторые вещи:

  • связь с брокерами Kafka
  • некоторые свойства Kafka Producer и Consumer, такие как сериализатор и десериализатор сообщений, которые мы настроили
  • база URI Kafka для интересующей нас Темы, которую я назвал здесь ПРИМЕР-ТЕМА

Конфигурации верблюда

То, что мы видели до сих пор, ориентировано на создание сообщения (за исключением компонента MessageWrapperDeserializer). Теперь мы видим, как настроить маршрут, использующий объекты MessageWrapper из темы Kafka, которую мы опубликовали через ProducerService.

Внутри папки src / main / resources мы можем создать папку «camel», в которую мы можем поместить некоторые файлы XML для загрузки в качестве маршрутов Camel. Верблюжий маршрут для получения сообщения легко записать следующим образом:

URI заменяется при запуске маршрута со свойствами, настроенными в application.yml. При использовании с этой конечной точки он вызывается компонентом, который является простым потребителем сообщений, поступающих в тему Kafka: ConsumerBean.

Потребительский компонент - это не что иное, как класс компонентов Spring, а именно:

Тесты

После всех реализаций я сделал и поделился ниже простым тестовым классом, с помощью которого я выполнил руководство и получил все, что работает. С помощью этого теста я опубликовал два сообщения в теме Kafka:

  • строка «привет, мир»
  • товар с кодом «А» и названием «первый товар»

Оба инкапсулируются в MessageWrapper и публикуются в Kafka. После того, как маршруты Camel начинают использовать, конфигурация запускает MessageWrapperDeserializer, который принимает полезные данные и помещает их в основную часть Camel Exchange. При этом ConsumerBean просто возьмет тело и поглотит его.

Что ж, я закончил. Я надеюсь, что вам понравится эта история, и если у вас есть какие-либо вопросы или предложения по ней, пожалуйста, оставьте мне комментарий.