Apache Kafka — это распределенная платформа потоковой передачи, которая используется для создания конвейеров данных в реальном времени и потоковых приложений.

Это хорошо масштабируемая, отказоустойчивая и быстрая система обмена сообщениями, способная обрабатывать большие объемы данных. Kafka предназначена для обработки потоков данных в режиме реального времени с помощью модели публикации-подписки, что означает, что производители данных публикуют сообщения в темах Kafka, а потребители данных подписываются на эти темы и получают данные в режиме реального времени.

В этом уроке мы создадим простое приложение, которое использует машинное обучение для прогнозирования цены дома на основе его характеристик. Мы будем использовать Apache Kafka для создания конвейера данных, который передает данные из веб-приложения в нашу модель машинного обучения, а затем отправляет прогноз обратно в веб-приложение.

Предпосылки

Прежде чем мы начнем, вам необходимо установить следующие инструменты и технологии:

  • Питон 3
  • Апач Кафка
  • Колба
  • Scikit-learn

Шаг 1. Настройте среду разработки

Во-первых, давайте создадим новую виртуальную среду Python, чтобы наши зависимости были отделены от других проектов. Вы можете создать новую виртуальную среду, используя следующую команду:

$ python3 -m venv myenv

Активируйте виртуальную среду:

$ source myenv/bin/activate

Затем давайте установим необходимые зависимости:

$ pip install kafka-python flask scikit-learn

Шаг 2. Создайте модель машинного обучения

Далее давайте создадим простую модель машинного обучения, которая будет прогнозировать цену дома на основе его характеристик. Создайте новый файл с именем model.py и добавьте следующий код:

from sklearn.linear_model import LinearRegression

class HousePricePredictor:
    def __init__(self):
        self.model = LinearRegression()

    def train(self, X, y):
        self.model.fit(X, y)

    def predict(self, X):
        return self.model.predict(X)

Этот код определяет класс HousePricePredictor, который использует алгоритм LinearRegression из Scikit-learn для прогнозирования цен на жилье на основе их характеристик. У нас есть два метода: train, который обучает модель заданному набору функций и целевых цен, и predict, который берет набор функций и возвращает прогнозируемую цену.

Шаг 3: Настройте Кафку

Теперь давайте настроим Кафку. Начните с загрузки Kafka с веб-сайта Apache и извлечения файлов. Затем откройте окно терминала и перейдите в каталог Kafka. Запустите сервер Kafka, выполнив следующую команду:

$ bin/kafka-server-start.sh config/server.properties

Затем создайте новую тему Kafka, которую мы будем использовать для отправки и получения данных. Откройте новое окно терминала и выполните следующую команду:

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic house-prices

Эта команда создает новую тему с именем house-prices с одним разделом и одной репликой.

Шаг 4: Создайте веб-приложение Flask

Теперь давайте создадим простое веб-приложение Flask, которое позволит пользователям вводить характеристики дома и получать прогнозируемую цену. Создайте новый файл с именем app.py и добавьте следующий код:

from flask import Flask, request, jsonify
from kafka import KafkaProducer, KafkaConsumer
from model import HousePricePredictor
import json

app = Flask(__name__)
predictor = HousePricePredictor()
producer = KafkaProducer(bootstrap_servers='localhost:9092')

@app.route('/predict', methods=['POST'])
def predict():
    data = json.loads(request.data.decode('utf-8'))
    features = [data['area'], data['bedrooms'], data['bathrooms']]
    prediction = predictor.predict([features])[0]
    producer.send('house-prices', json.dumps({'prediction': prediction}).encode('utf-8'))
    return jsonify({'prediction': prediction})

if __name__ == '__main__':
    app.run()

Этот код создает веб-приложение Flask с одной конечной точкой с именем /predict. Эта конечная точка принимает запрос POST с тремя параметрами: area, bedrooms и bathrooms, которые представляют характеристики дома.

Мы используем класс HousePricePredictor для прогнозирования цены дома, а затем отправляем прогноз с помощью Kafka.

Шаг 5: Создайте потребителя Kafka

Теперь давайте создадим потребителя Kafka, который будет прослушивать прогнозы нашей модели машинного обучения и отображать их в консоли. Создайте новый файл с именем consumer.py и добавьте следующий код:

from kafka import KafkaConsumer

consumer = KafkaConsumer('house-prices', bootstrap_servers='localhost:9092', auto_offset_reset='earliest')

for message in consumer:
    prediction = json.loads(message.value.decode('utf-8'))['prediction']
    print(f"Received prediction: {prediction}")

Этот код создает потребителя Kafka, который прослушивает сообщения в теме house-prices. Когда сообщение получено, оно извлекает значение предсказания и выводит его на консоль.

Шаг 6: Запустите приложение

Теперь, когда у нас есть все элементы, давайте запустим приложение. Начните с запуска веб-приложения Flask:

$ python app.py

Затем откройте новое окно терминала и запустите потребителя Kafka:

$ python consumer.py

Наконец, откройте веб-браузер и перейдите к http://localhost:5000. Вы должны увидеть простую форму, позволяющую вводить характеристики дома. Введите некоторые образцы данных и нажмите кнопку «Предсказать». Вы должны увидеть прогнозируемую цену, напечатанную в консоли, где работает потребитель Kafka.

Поздравляем! Вы успешно создали приложение с искусственным интеллектом, используя ML и Apache Kafka. Теперь вы можете настроить это приложение в соответствии со своими конкретными потребностями или использовать его в качестве отправной точки для своих собственных проектов.

Если вам понравилось читать это руководство и оно оказалось полезным, поддержите меня в Купи мне кофе 😎

Дополнительные материалы на PlainEnglish.io.

Подпишитесь на нашу бесплатную еженедельную рассылку новостей. Подпишитесь на нас в Twitter, LinkedIn, YouTube и Discord .

Заинтересованы в масштабировании запуска вашего программного обеспечения? Ознакомьтесь с разделом Схема.