Узнайте, как отслеживать цены на золото в реальном времени с помощью Apache Kafka Pandas. Нанесите последние цены на гистограмму.

Примечание от редакторов Data Science. Хотя мы разрешаем независимым авторам публиковать статьи в соответствии с нашими правилами и рекомендациями, мы не поддерживаем вклад каждого автора. Не стоит полагаться на работы автора без консультации с профессионалами. См. Подробности в наших Условиях для читателей.

Мы живем в эпоху, когда отслеживание, обработка и анализ данных в реальном времени становятся необходимостью для многих предприятий. Излишне говорить, что обработка потоковых наборов данных становится одним из наиболее важных и востребованных навыков для инженеров по обработке данных и ученых.

В этой статье я предполагаю, что вы знакомы с Apache Kafka - платформой распределенной потоковой передачи событий с открытым исходным кодом. Apache Kafka имеет встроенные механизмы секционирования, репликации и отказоустойчивости. В прошлом я использовал Apache Kafka в нескольких проектах для нескольких вариантов использования, включая сбор метрик, сбор журналов и потоковую обработку.

Apache Kafka - горячая тема для обсуждения с моими студентами в моем курсе Data Engineering and Data Science. Из множества примеров, которые я рассмотрел в ходе курса, я решил поделиться этим.

Мы будем принимать потоковые данные из следующего API в Kafka.

Https://forex-data-feed.swissquote.com/public-quotes/bboquotes/instrument/XAU/USD

Мы начнем с создания Kafka Producer, который будет считывать данные из API выше и непрерывно отправлять данные в Kafka.

Мы также создадим Kafka Consumer, который будет постоянно считывать данные из Kafka. как только сообщение будет прочитано, мы извлечем последнюю цену на золото и нанесем результаты на гистограмму.

Технический стек: Apache Kafka, Pandas, Python, библиотека kafka-python и MatPlotLib

Подготовительные работы

Установить Java

sudo yum -y install java-1.8.0-openjdk.x86_64

Скачать Apache Kafka

$ cd /opt
$ sudo wget http://apache.forsale.plus/kafka/2.5.0/kafka_2.13-2.5.0.tgz
$ sudo tar -zxf kafka_2.13-2.5.0.tgz
$ cd kafka_2.13-2.5.0

Запустите Zookeeper и Kafka Server в фоновом режиме

$ cd /opt/kafka_2.13-2.5.0;export JAVA_HOME=/usr/lib/jvm/jre-1.8.0;export JRE_HOME=/usr/lib/jvm/jre
$ nohup sudo /bin/zookeeper-server-start.sh config/zookeeper.properties &
$ nohup sudo bin/kafka-server-start.sh config/server.properties &

Создайте новую тему Kafka - Goldrates

$ sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1     --partitions 1 --topic Goldrates
$ sudo bin/kafka-topics.sh --list --zookeeper localhost:2181

Клонировать мой репозиторий Github:

$ git clone https://github.com/mkukreja1/blogs.git

Создайте продюсера Kafka

Установите ноутбук Kafka Producer в Jupyter. Я собираюсь выполнить шаги, чтобы понять поток.

kafka / goldrates / kafkaProducerGoldrates.ipynb

import sys
!{sys.executable} -m pip install kafka-python
import time
import json
from json import dumps
from kafka import KafkaProducer
from time import sleep
import requests as req

KafkaProducer - класс высокого уровня для создания асинхронного производителя сообщений.

ticker_url="https://forex-data-feed.swissquote.com/public-quotes/bboquotes/instrument/XAU/USD"
brokers='localhost:9092'
topic='Goldrates'
sleep_time=300

Объявите некоторые константы, которые будут использоваться в программе. брокеры могут быть списком брокеров, участвующих в вашем кластере Apache Kafka.

producer = KafkaProducer(bootstrap_servers=[brokers],value_serializer=lambda x: dumps(x).encode('utf-8'))

Это инициализирует новый производитель Kafka. Этот производитель будет сериализовать данные перед их отправкой в ​​список брокеров. Данные будут преобразованы в JSON и закодированы с использованием utf-8.

while(True):
    print("Getting new data...")
    resp = req.get(ticker_url)
    json_data = json.loads(resp.text)
    producer.send(topic, json_data)
    time.sleep(sleep_time)

Этот код работает в бесконечном цикле. Он извлекает данные из API Goldrates, преобразует входящий объект JSON в объект Python и, наконец, отправляет эти данные в определенную тему в Kafka. После каждой итерации он спит в течение константы period = sleep_time, объявленной ранее.

Если у вас в Kafka Producer все работает нормально, вы должны увидеть следующий результат:

Получение новых данных…
Получение новых данных…

Оставьте эту записную книжку включенной, чтобы потребитель Kafka мог использовать эти сообщения.

Создайте потребителя Kafka

Установите ноутбук Kafka Consumer в Jupyter. Как это было сделано ранее, я собираюсь выполнить шаги для понимания потока.

kafka / goldrates / kafkaConsumerGoldrates.ipynb

import time
import json
from kafka import KafkaConsumer
from pandas import DataFrame
from datetime import datetime
import matplotlib.pyplot as plt

KafkaConsumer - класс высокого уровня для создания потребителя асинхронных сообщений.

brokers='localhost:9092'
topic='Goldrates'
sleep_time=300
offset='latest'

Объявите некоторые константы, которые будут использоваться в программе. брокеры могут быть списком брокеров, участвующих в вашем кластере Apache Kafka. Offset = latest означает, что код должен получить последнее сообщение из очереди Kafka. У вас также есть возможность начать с самого раннего .

consumer = KafkaConsumer(bootstrap_servers=brokers, auto_offset_reset=offset,consumer_timeout_ms=1000)
consumer.subscribe([topic])

Это инициализирует новый клиент Kafka.

auto_offset_reset - используется только в том случае, если потребитель сталкивается с неожиданными проблемами. В этом случае мы просим потребителя перезапустить чтение сообщений из последнего сообщения. У вас также есть возможность начать с самого раннего .

auto_commit_interval_ms - период времени между двумя фиксациями.

Наконец, мы просим потребителя подписаться, то есть получать сообщения из темы, определенной как константа.

oldprice_dict = {}
while(True):
    for message in consumer:
        #print(message)
        d=json.loads(message.value)
        df=DataFrame(d)
        for x in range(1):
            #print(x)
            new_price=df['spreadProfilePrices'][x][0]
            ts=df['ts'][x]
            #print(new_price)
            print("Latest Gold Price: " + str(new_price['ask']))
            datetime_time = datetime.fromtimestamp(ts/1000)
            goldprice_dict[str(datetime_time)]=new_price['ask']
            print("Gold Price at: " + str(datetime_time))
        print("-----------------------------------------")
        #print(goldprice_dict)
        plt.figure(figsize=(20,10))
        plt.bar(range(len(goldprice_dict)), list(goldprice_dict.values()), align='center',linewidth=0.5)
        plt.xticks(range(len(goldprice_dict)), list(goldprice_dict.keys()))
        plt.show()
        
    time.sleep(sleep_time)

Для каждого сообщения JSON загружается в фрейм данных Pandas. Последняя цена на золото и временная метка извлекаются из Dataframe и сохраняются в словаре Python. Наконец, словарь Python используется для построения гистограммы следующим образом:

Ось X - отметка времени чтения цены на золото

Ось Y - Цена на золото

Если все работает хорошо, для каждой итерации последняя цена в золоте будет отображаться следующим образом:

Ожидаемый результат ниже:

Последняя цена на золото: 1902,951
Цена на золото в: 2020–10–26 15:16:16
- - - - - - - - - - - - - - - - - - - - - -

Последняя цена на золото: 1902,968
Цена на золото в: 2020–10–26 15:21:17
- - - - - - - - - - - - - - - - - - - - - -

Последняя цена на золото: 1902,208
Цена на золото в: 2020–10–26 15:26:18
- - - - - - - - - - - - - - - - - - - - - -

Последняя цена на золото: 1902,293
Цена на золото в: 2020–10–26 15:31:19
- - - - - - - - - - - - - - - - - - - - - -

Последняя цена на золото: 1902,693
Цена на золото в: 2020–10–26 15:36:19
- - - - - - - - - - - - - - - - - - - - - -

Весь код для этой статьи можно найти по ссылке ниже:



Надеюсь, эта статья была полезной. Тема Apache Kafka рассматривается более подробно в рамках курса Big Data Hadoop, Spark & ​​Kafka, предлагаемого Datafence Cloud Academy.