До недавнего времени большинство компаний использовали традиционный подход для хранения всех данных компании в хранилище данных.

Рост Интернета привел к увеличению количества источников данных и огромных объемов данных, которые нужно было хранить, что потребовало постоянного масштабирования этих хранилищ данных. Они не были предназначены для обработки петабайтов данных, поэтому компании были вынуждены использовать платформы больших данных (такие как Hadoop), способные преобразовывать большие данные в полезные идеи с высокой скоростью.

Hadoop традиционно сочетал хранилище и вычисления вместе, поэтому это было относительно дорого. Каждый узел кластера был не просто системой хранения, это был сервер с процессорами и ресурсами памяти. Чем больше данных вы храните, тем больше вычислительных ресурсов вам нужно для оплаты, даже если на самом деле вам не нужны дополнительные вычислительные мощности для анализа данных. В результате предполагалось, что вычислительные ресурсы и хранилище должны масштабироваться вместе, а кластеры постоянно были включены, в противном случае данные становились недоступными.

Это сила, которая движет отделением хранилища от вычислений, чтобы иметь возможность масштабировать их по отдельности. Так появился термин озеро данных, определяемый как совокупность экземпляров хранилища различных активов данных в дополнение к исходным источникам данных.

Озеро данных - это репозиторий, способный хранить огромные объемы данных в различных форматах. Он может содержать данные из журналов веб-сервера, баз данных приложений и сторонних данных. Данные могут поступать в озеро данных путем пакетной обработки потоковых данных или обработки в реальном времени.

Причина создания озера данных - предоставить возможность запрашивать все источники данных, объединенные в одном месте, предоставляя «Данные и аналитика как услуга» для бизнес-пользователей, таких как специалисты по обработке данных и бизнес-аналитики.

Озеро данных предлагает подход, при котором вычисления и хранилище могут быть разделены, в нашем случае S3 используется в качестве хранилища объектов, а любые механизмы обработки (Spark, Presto и т. Д.) Могут использоваться для вычислений. Это означает, что мы можем масштабироваться отдельно в зависимости от потребностей бизнеса.

Как я объяснял в одном из моих предыдущих постов, мы решили создать озеро данных на основе Amazon Simple Storage Service (S3), чтобы объединить частные и сторонние данные, чтобы пользователи могли отвечать на их самые интересные бизнес-вопросы.

Мы используем AWS Kinesis Firehose для передачи данных в S3, лямбда-функции AWS для некоторой предварительной обработки и Spark для наших конвейеров данных.

Чтобы запросить S3, нам понадобился механизм запросов, который позволил бы нам использовать SQL для предоставления доступа к ТБ структурированных и полуструктурированных данных для быстрого анализа. Мы оценили Presto, Athena и Spark SQL.

Выходные данные располагаются в S3 в следующем формате (разделы, похожие на hadoop):

‹Bucket_name› \ processing \ ‹event_type› \ dt = ‹date› \ hh = ‹time› \ *. Json

В Presto и Athena (управляемая версия Presto от AWS) мы столкнулись с ошибками либо при создании таблиц с помощью HIVE, либо при запросе этих таблиц. Мы обнаружили, что и Presto, и Athena не «любят» типы данных, которые меняются со временем (например: временная метка была строкой, а затем bigint) или любые специальные символы в начале имен полей (_data_type). Мы исправили данные, используя Spark и лямбды предварительной обработки, поэтому мы думали, что этого будет достаточно, но этого было недостаточно, у нас была очень низкая производительность при запросе небольших файлов JSON. Мы знали, что Parquet будет работать намного лучше, но разделы требовали файлов меньшего размера. Так что мы сделали?

Мы решили попробовать Spark SQL в качестве нашего механизма запросов. Spark SQL предоставляет возможность предоставлять наборы данных Spark через JDBC API и позволяет выполнять SQL-подобные запросы к данным Spark с использованием традиционных инструментов бизнес-аналитики и визуализации.

Вдобавок к этому нашим аналитикам и специалистам по обработке данных требовались надежные инструменты визуализации, чтобы находить закономерности и корреляции данных.

Нашим выбором был Apache Zeppelin. Zeppelin - это веб-блокнот, который обеспечивает управляемую данными, интерактивную аналитику данных и совместную работу с документами с помощью SQL, Scala, Python и др. Он предоставляет аналитикам отличный способ создавать интерактивные веб-записные книжки для написания запросов и визуализации результатов. Записными книжками можно делиться в режиме реального времени. Запросы могут быть запланированы.

Руководство

1- Создайте кластер в AWS EMR с помощью Spark и Zeppelin.

2- Щелкните ссылку «Zeppelin» после того, как кластер был подготовлен для доступа к пользовательскому интерфейсу Zeppelin.

3- Загрузите образцы данных для данных о дорожно-транспортных происшествиях в Сан-Франциско. (его можно найти по адресу: https://data.sfgov.org/api/views/vv57-2fgy/rows.csv?accessType=DOWNLOAD).

4- Поместите его в ведро S3, например: «test-zeppelin-ni».

Теперь мы можем получать доступ к данным и запрашивать их с помощью Spark SQL и Zeppelin.

Мы напишем некоторый код Scala внутри Zeppelin, чтобы визуализировать этот файл CSV и извлечь содержащуюся в нем информацию. Чтобы просмотреть содержимое этого файла и управлять данными, мы создадим фрейм данных, привяжем его к схеме, создадим временную таблицу, а затем будем использовать SQL для запросов.

«Мощная особенность Spark SQL заключается в том, что вы можете программно привязать схему к источнику данных и сопоставить ее с классами сценариев Scala, по которым можно перемещаться и запрашивать безопасный тип».

импортировать org.apache.spark.sql.types. {StructType, StructField, StringType, IntegerType};
импортировать org.apache.spark.sql. {DataFrame, Dataset, Encoders, SparkSession} < br /> import org.joda.time.DateTime
case class TrafficData (IncidntNum: String, Category: String, Descript: String, DayOfWeek: String,
Date: String, Time: String, PdDistrict: String , Разрешение: Строка,
Адрес: Строка,
X: Двойное, Y: Двойное, Местоположение: Строка, PdId: Строка)

Схема val: StructType = Encoders.product [TrafficData] .schema

val df = sqlContext.read.schema (schema) .csv («s3a: // test-zeppelin-ni / *»)
df.printSchema ()

Вот второй абзац:

df.registerTempTable («Таблица трафика»)

Вот третий абзац:

% sql

выберите * из TrafficTable

Как видите, после загрузки данных в Spark мы создали временную таблицу, а затем можем запросить ее с помощью обычного SQL.

Подведение итогов

Я надеюсь, что эта статья дала вам хорошую отправную точку для изучения того, как использовать Spark и Zeppelin.

В этой статье мы узнали, как читать большой набор данных из публичной корзины S3 и выполнять SQL-запросы к нему. Эта комбинация - одна из наиболее часто используемых настроек для проектов машинного обучения, выполняющих масштабные данные.

Примечание: помните, что изменения, которые вы вносите в Zeppelin, сохраняются только до тех пор, пока работает кластер EMR.

Если вы хотите узнать больше о Zeppelin посмотрите это видео Мун Су Ли, создателя Zeppelin, выступающего в Амстердаме на Spark Summit Europe 2015.