Узнайте, как отслеживать цены на золото в реальном времени с помощью Apache Kafka Pandas. Нанесите последние цены на гистограмму.
Примечание от редакторов Data Science. Хотя мы разрешаем независимым авторам публиковать статьи в соответствии с нашими правилами и рекомендациями, мы не поддерживаем вклад каждого автора. Не стоит полагаться на работы автора без консультации с профессионалами. См. Подробности в наших Условиях для читателей.
Мы живем в эпоху, когда отслеживание, обработка и анализ данных в реальном времени становятся необходимостью для многих предприятий. Излишне говорить, что обработка потоковых наборов данных становится одним из наиболее важных и востребованных навыков для инженеров по обработке данных и ученых.
В этой статье я предполагаю, что вы знакомы с Apache Kafka - платформой распределенной потоковой передачи событий с открытым исходным кодом. Apache Kafka имеет встроенные механизмы секционирования, репликации и отказоустойчивости. В прошлом я использовал Apache Kafka в нескольких проектах для нескольких вариантов использования, включая сбор метрик, сбор журналов и потоковую обработку.
Apache Kafka - горячая тема для обсуждения с моими студентами в моем курсе Data Engineering and Data Science. Из множества примеров, которые я рассмотрел в ходе курса, я решил поделиться этим.
Мы будем принимать потоковые данные из следующего API в Kafka.
Https://forex-data-feed.swissquote.com/public-quotes/bboquotes/instrument/XAU/USD
Мы начнем с создания Kafka Producer, который будет считывать данные из API выше и непрерывно отправлять данные в Kafka.
Мы также создадим Kafka Consumer, который будет постоянно считывать данные из Kafka. как только сообщение будет прочитано, мы извлечем последнюю цену на золото и нанесем результаты на гистограмму.
Технический стек: Apache Kafka, Pandas, Python, библиотека kafka-python и MatPlotLib
Подготовительные работы
Установить Java
sudo yum -y install java-1.8.0-openjdk.x86_64
Скачать Apache Kafka
$ cd /opt $ sudo wget http://apache.forsale.plus/kafka/2.5.0/kafka_2.13-2.5.0.tgz $ sudo tar -zxf kafka_2.13-2.5.0.tgz $ cd kafka_2.13-2.5.0
Запустите Zookeeper и Kafka Server в фоновом режиме
$ cd /opt/kafka_2.13-2.5.0;export JAVA_HOME=/usr/lib/jvm/jre-1.8.0;export JRE_HOME=/usr/lib/jvm/jre $ nohup sudo /bin/zookeeper-server-start.sh config/zookeeper.properties & $ nohup sudo bin/kafka-server-start.sh config/server.properties &
Создайте новую тему Kafka - Goldrates
$ sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Goldrates $ sudo bin/kafka-topics.sh --list --zookeeper localhost:2181
Клонировать мой репозиторий Github:
$ git clone https://github.com/mkukreja1/blogs.git
Создайте продюсера Kafka
Установите ноутбук Kafka Producer в Jupyter. Я собираюсь выполнить шаги, чтобы понять поток.
kafka / goldrates / kafkaProducerGoldrates.ipynb
import sys !{sys.executable} -m pip install kafka-python import time import json from json import dumps from kafka import KafkaProducer from time import sleep import requests as req
KafkaProducer - класс высокого уровня для создания асинхронного производителя сообщений.
ticker_url="https://forex-data-feed.swissquote.com/public-quotes/bboquotes/instrument/XAU/USD" brokers='localhost:9092' topic='Goldrates' sleep_time=300
Объявите некоторые константы, которые будут использоваться в программе. брокеры могут быть списком брокеров, участвующих в вашем кластере Apache Kafka.
producer = KafkaProducer(bootstrap_servers=[brokers],value_serializer=lambda x: dumps(x).encode('utf-8'))
Это инициализирует новый производитель Kafka. Этот производитель будет сериализовать данные перед их отправкой в список брокеров. Данные будут преобразованы в JSON и закодированы с использованием utf-8.
while(True): print("Getting new data...") resp = req.get(ticker_url) json_data = json.loads(resp.text) producer.send(topic, json_data) time.sleep(sleep_time)
Этот код работает в бесконечном цикле. Он извлекает данные из API Goldrates, преобразует входящий объект JSON в объект Python и, наконец, отправляет эти данные в определенную тему в Kafka. После каждой итерации он спит в течение константы period = sleep_time, объявленной ранее.
Если у вас в Kafka Producer все работает нормально, вы должны увидеть следующий результат:
Получение новых данных…
Получение новых данных…
Оставьте эту записную книжку включенной, чтобы потребитель Kafka мог использовать эти сообщения.
Создайте потребителя Kafka
Установите ноутбук Kafka Consumer в Jupyter. Как это было сделано ранее, я собираюсь выполнить шаги для понимания потока.
kafka / goldrates / kafkaConsumerGoldrates.ipynb
import time import json from kafka import KafkaConsumer from pandas import DataFrame from datetime import datetime import matplotlib.pyplot as plt
KafkaConsumer - класс высокого уровня для создания потребителя асинхронных сообщений.
brokers='localhost:9092' topic='Goldrates' sleep_time=300 offset='latest'
Объявите некоторые константы, которые будут использоваться в программе. брокеры могут быть списком брокеров, участвующих в вашем кластере Apache Kafka. Offset = latest означает, что код должен получить последнее сообщение из очереди Kafka. У вас также есть возможность начать с самого раннего .
consumer = KafkaConsumer(bootstrap_servers=brokers, auto_offset_reset=offset,consumer_timeout_ms=1000) consumer.subscribe([topic])
Это инициализирует новый клиент Kafka.
auto_offset_reset - используется только в том случае, если потребитель сталкивается с неожиданными проблемами. В этом случае мы просим потребителя перезапустить чтение сообщений из последнего сообщения. У вас также есть возможность начать с самого раннего .
auto_commit_interval_ms - период времени между двумя фиксациями.
Наконец, мы просим потребителя подписаться, то есть получать сообщения из темы, определенной как константа.
oldprice_dict = {} while(True): for message in consumer: #print(message) d=json.loads(message.value) df=DataFrame(d) for x in range(1): #print(x) new_price=df['spreadProfilePrices'][x][0] ts=df['ts'][x] #print(new_price) print("Latest Gold Price: " + str(new_price['ask'])) datetime_time = datetime.fromtimestamp(ts/1000) goldprice_dict[str(datetime_time)]=new_price['ask'] print("Gold Price at: " + str(datetime_time)) print("-----------------------------------------") #print(goldprice_dict) plt.figure(figsize=(20,10)) plt.bar(range(len(goldprice_dict)), list(goldprice_dict.values()), align='center',linewidth=0.5) plt.xticks(range(len(goldprice_dict)), list(goldprice_dict.keys())) plt.show() time.sleep(sleep_time)
Для каждого сообщения JSON загружается в фрейм данных Pandas. Последняя цена на золото и временная метка извлекаются из Dataframe и сохраняются в словаре Python. Наконец, словарь Python используется для построения гистограммы следующим образом:
Ось X - отметка времени чтения цены на золото
Ось Y - Цена на золото
Если все работает хорошо, для каждой итерации последняя цена в золоте будет отображаться следующим образом:
Ожидаемый результат ниже:
Последняя цена на золото: 1902,951
Цена на золото в: 2020–10–26 15:16:16
- - - - - - - - - - - - - - - - - - - - - -
Последняя цена на золото: 1902,968
Цена на золото в: 2020–10–26 15:21:17
- - - - - - - - - - - - - - - - - - - - - -
Последняя цена на золото: 1902,208
Цена на золото в: 2020–10–26 15:26:18
- - - - - - - - - - - - - - - - - - - - - -
Последняя цена на золото: 1902,293
Цена на золото в: 2020–10–26 15:31:19
- - - - - - - - - - - - - - - - - - - - - -
Последняя цена на золото: 1902,693
Цена на золото в: 2020–10–26 15:36:19
- - - - - - - - - - - - - - - - - - - - - -
Весь код для этой статьи можно найти по ссылке ниже:
Надеюсь, эта статья была полезной. Тема Apache Kafka рассматривается более подробно в рамках курса Big Data Hadoop, Spark & Kafka, предлагаемого Datafence Cloud Academy.