Apache Kafka — это распределенная платформа потоковой передачи, которая используется для создания конвейеров данных в реальном времени и потоковых приложений.
Это хорошо масштабируемая, отказоустойчивая и быстрая система обмена сообщениями, способная обрабатывать большие объемы данных. Kafka предназначена для обработки потоков данных в режиме реального времени с помощью модели публикации-подписки, что означает, что производители данных публикуют сообщения в темах Kafka, а потребители данных подписываются на эти темы и получают данные в режиме реального времени.
В этом уроке мы создадим простое приложение, которое использует машинное обучение для прогнозирования цены дома на основе его характеристик. Мы будем использовать Apache Kafka для создания конвейера данных, который передает данные из веб-приложения в нашу модель машинного обучения, а затем отправляет прогноз обратно в веб-приложение.
Предпосылки
Прежде чем мы начнем, вам необходимо установить следующие инструменты и технологии:
- Питон 3
- Апач Кафка
- Колба
- Scikit-learn
Шаг 1. Настройте среду разработки
Во-первых, давайте создадим новую виртуальную среду Python, чтобы наши зависимости были отделены от других проектов. Вы можете создать новую виртуальную среду, используя следующую команду:
$ python3 -m venv myenv
Активируйте виртуальную среду:
$ source myenv/bin/activate
Затем давайте установим необходимые зависимости:
$ pip install kafka-python flask scikit-learn
Шаг 2. Создайте модель машинного обучения
Далее давайте создадим простую модель машинного обучения, которая будет прогнозировать цену дома на основе его характеристик. Создайте новый файл с именем model.py
и добавьте следующий код:
from sklearn.linear_model import LinearRegression class HousePricePredictor: def __init__(self): self.model = LinearRegression() def train(self, X, y): self.model.fit(X, y) def predict(self, X): return self.model.predict(X)
Этот код определяет класс HousePricePredictor
, который использует алгоритм LinearRegression
из Scikit-learn для прогнозирования цен на жилье на основе их характеристик. У нас есть два метода: train
, который обучает модель заданному набору функций и целевых цен, и predict
, который берет набор функций и возвращает прогнозируемую цену.
Шаг 3: Настройте Кафку
Теперь давайте настроим Кафку. Начните с загрузки Kafka с веб-сайта Apache и извлечения файлов. Затем откройте окно терминала и перейдите в каталог Kafka. Запустите сервер Kafka, выполнив следующую команду:
$ bin/kafka-server-start.sh config/server.properties
Затем создайте новую тему Kafka, которую мы будем использовать для отправки и получения данных. Откройте новое окно терминала и выполните следующую команду:
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic house-prices
Эта команда создает новую тему с именем house-prices
с одним разделом и одной репликой.
Шаг 4: Создайте веб-приложение Flask
Теперь давайте создадим простое веб-приложение Flask, которое позволит пользователям вводить характеристики дома и получать прогнозируемую цену. Создайте новый файл с именем app.py
и добавьте следующий код:
from flask import Flask, request, jsonify from kafka import KafkaProducer, KafkaConsumer from model import HousePricePredictor import json app = Flask(__name__) predictor = HousePricePredictor() producer = KafkaProducer(bootstrap_servers='localhost:9092') @app.route('/predict', methods=['POST']) def predict(): data = json.loads(request.data.decode('utf-8')) features = [data['area'], data['bedrooms'], data['bathrooms']] prediction = predictor.predict([features])[0] producer.send('house-prices', json.dumps({'prediction': prediction}).encode('utf-8')) return jsonify({'prediction': prediction}) if __name__ == '__main__': app.run()
Этот код создает веб-приложение Flask с одной конечной точкой с именем /predict
. Эта конечная точка принимает запрос POST с тремя параметрами: area
, bedrooms
и bathrooms
, которые представляют характеристики дома.
Мы используем класс HousePricePredictor
для прогнозирования цены дома, а затем отправляем прогноз с помощью Kafka.
Шаг 5: Создайте потребителя Kafka
Теперь давайте создадим потребителя Kafka, который будет прослушивать прогнозы нашей модели машинного обучения и отображать их в консоли. Создайте новый файл с именем consumer.py
и добавьте следующий код:
from kafka import KafkaConsumer consumer = KafkaConsumer('house-prices', bootstrap_servers='localhost:9092', auto_offset_reset='earliest') for message in consumer: prediction = json.loads(message.value.decode('utf-8'))['prediction'] print(f"Received prediction: {prediction}")
Этот код создает потребителя Kafka, который прослушивает сообщения в теме house-prices
. Когда сообщение получено, оно извлекает значение предсказания и выводит его на консоль.
Шаг 6: Запустите приложение
Теперь, когда у нас есть все элементы, давайте запустим приложение. Начните с запуска веб-приложения Flask:
$ python app.py
Затем откройте новое окно терминала и запустите потребителя Kafka:
$ python consumer.py
Наконец, откройте веб-браузер и перейдите к http://localhost:5000
. Вы должны увидеть простую форму, позволяющую вводить характеристики дома. Введите некоторые образцы данных и нажмите кнопку «Предсказать». Вы должны увидеть прогнозируемую цену, напечатанную в консоли, где работает потребитель Kafka.
Поздравляем! Вы успешно создали приложение с искусственным интеллектом, используя ML и Apache Kafka. Теперь вы можете настроить это приложение в соответствии со своими конкретными потребностями или использовать его в качестве отправной точки для своих собственных проектов.
Если вам понравилось читать это руководство и оно оказалось полезным, поддержите меня в Купи мне кофе 😎
Дополнительные материалы на PlainEnglish.io.
Подпишитесь на нашу бесплатную еженедельную рассылку новостей. Подпишитесь на нас в Twitter, LinkedIn, YouTube и Discord .
Заинтересованы в масштабировании запуска вашего программного обеспечения? Ознакомьтесь с разделом Схема.