Какой смысл просыпаться после того, как в твой дом вломились? Вам не кажется, что бесполезно предупреждать себя после того, как катастрофа уже сильно ударила по вам? Мы должны знать об этих вещах в режиме реального времени. Верно???

У нас есть очень хорошаяфраза на хинди

“अब पछताये होत क्या जब चिड़ियाँ चुग गई खेत”

Это означает: - Когда птицы испортили поле, они съели семена, и там ничего не осталось,

Почему сейчас так грустно?

Мораль такова: Делайте все вовремя”.

Не действуйте, когда уже слишком поздно. Тогда вам будет жаль.

Чтобы решить эти проблемы, у нас есть «SPARK STREAMING».

Что мы рассмотрим в этой статье?

  • Что такое потоковая передача данных?
  • Важность паровых данных
  • Проблемы в приложениях потоковой передачи данных
  • Что такое искровой поток?
  • Потоковая передача данных в режиме реального времени с использованием Socket || питон

Поехали!! (🌟)

Что такое потоковая передача данных?

Слово «потоковая передача» используется для описания непрерывного, бесконечного потока данных без начальной и конечной точки,

Потоковая передача данных — это процесс передачи непрерывного потока данных, известного как поток.

Поток данных состоит из ряда данных, упорядоченных во времени. Например, потоки данных включают данные датчиков, журналы транзакций, данные IOT, журналы веб-браузера и многое другое.

Мы можем думать о потоковой передаче данных как о конвейерной ленте, несущей данные и хранящей их в базе данных, которую впоследствии можно использовать для обнаружения мошенничества в режиме реального времени, мониторинга состояния, отправки предупреждений и визуализации данных в режиме реального времени, что может быть очень полезным.

Важность потоковой передачи данных:

Сегодняшние данные генерируются бесконечным количеством источников, они могут быть датчиками данных IoT, веб-журналами, игровыми приложениями, журналами безопасности, серверами и всем остальным, и практически невозможно контролировать и регулировать данные.

Итак, приложению, работающему с потоками данных, необходимы некоторые функции: Хранение и обработка данных,

Хранилище должно быть способно обрабатывать большие объемы данных, поступающих в режиме реального времени.

Обработка данных должна иметь возможность обрабатывать и отправлять соответствующие требования

Реальные примеры потоковой передачи данных в реальном времени

  • Обнаружение мошенничества
  • Многопользовательские игры в реальном времени
  • Данные датчика Интернета вещей
  • активность клиентов
  • Обнаружение языка ненависти в режиме реального времени (социальные сети)
  • Ленты социальных сетей
  • Прогнозирование AL/ML в режиме реального времени

И так далее по списку…

Проблемы в приложениях потоковой передачи данных

  • Масштабируемость. Нам нужна система, которая может хранить и масштабировать в соответствии с потребностями, поскольку у нас есть данные, поступающие в большом количестве.
  • Отказоустойчивость и гарантии данных.Данные поступают из разных источников, мест, форматов и с разной скоростью. Для обработки и хранения всех этих данных наша система должна быть отказоустойчивой и гарантировать, что все данные, отправленные пользователем, также принимаются нашим конвейером потоковой передачи, который обеспечивает гарантии данных.
  • Упорядочивание.Поскольку наши данные поступают с разной скоростью, нам нужно убедиться, что данные в порядке, иначе это не будет иметь никакого смысла, и наш конвейер потоковой обработки должен знать о свойствах транзакций данных. .

Чтобы поддерживать все эти свойства, мы собираемся использовать хорошо известную «Apache Spark» и обрабатывать данные, поступающие в режиме реального времени, с помощью сокетов.

Что такое Spark Streaming:-

Spark Streaming — это очень быстрый движок для обработки высоких столбцов данных, который в 100 раз быстрее, чем MapReduce от Google. Причина в том, что он использует распределенную обработку данных, которая создает небольшие фрагменты данных и выполняет параллельные вычисления на серверах.

Spark Streaming использует возможности быстрого планирования Spark Core для выполнения потоковой аналитики. Он принимает данные в мини-пакетах и ​​выполняет преобразования RDD для этих мини-пакетов данных. Этот дизайн позволяет использовать тот же набор кода приложения, написанный для пакетной аналитики, в потоковой аналитике, что облегчает реализацию лямбда-архитектуры.

Однако это удобство сопряжено со штрафом за задержку, равную продолжительности мини-пакета. Другие механизмы потоковой передачи данных, которые обрабатывают событие за событием, а не в мини-пакетах, включают Storm и потоковый компонент Flink. Spark Streaming имеет встроенную поддержку для использования из сокетов Kafka, Flume, Twitter, ZeroMQ, Kinesis и TCP/IP. [вики]

Spark Streaming — одна из важнейших частей больших данных. Которые принимают данные из разных источников в режиме реального времени, обрабатывают их и применяют к ним модели Al/ML и другие функции для их хранения в базе данных или выполнения действий в режиме реального времени.

Потоковая передача данных в реальном времени с использованием сокета || Питон

Шаг 1. Создание сеанса Spark:

Сеанс Spark — это точка входа приложения spark, в которой мы можем определить имя нашего приложения spark, которое должно быть уникальным, поскольку spark идентифицирует работающий кластер на основе имени приложения. Создание сеанса искры

# creating spark session
spark = SparkSession.builder.appName(“SocketExample”).master(‘local[*]’).getOrCreate()

Шаг 2. Создайте функцию для чтения данных из сокетов.

  • Необходимо создать stream_df, который содержит данные, поступающие из сокетов в некотором порту и имени хоста.
# loading/reading data from socket
stream_df = spark.readStream.format(‘socket’).option(“host”,host).option(“port”,port).load()

Шаг 3. Проверьте, передаются ли данные, и распечатайте схему

# check if still streaming
print(stream_df.isStreaming)
stream_df.printSchema()

Шаг 4. Запишите полученные данные в консоль (мы также можем сохранить эти данные, мы увидим это в следующих статьях)

# write stream in console
write_query = stream_df.writeStream.format(‘console’).start()

Шаг 5. Дождитесь закрытия сокета

write_query.awaitTermination()

Вот и все, теперь давайте посмотрим полный код

Полный код:-

После этого откройте 2 терминала и в одном терминале запустите сервер с помощью «netcat -lk 1100», а в другом сервере запустите «socket_streaming.py» и WALLA!!

Как видите, мы можем отправлять данные и получать их в режиме реального времени в пакетном режиме.

Итак, в следующих статьях мы узнаем о том, как мы используем триггеры для выполнения некоторых действий и как мы можем выполнять преобразование данных в режиме реального времени, и многое другое, так что следите за обновлениями…

На этом пока все, увидимся в следующей статье.

Я предоставил ссылки на изображения для тех изображений, которые не являются моими.

давайте общаться в Linkedin, Twitter, Instagram, Github и Facebook.

Спасибо, что прочитали!