Включение масштабирования данных карты по требованию с помощью пространственного разделения

Zendrive помогает водителям безопаснее управлять автомобилем. Мы делаем это, собирая данные о поведении при вождении через наших партнеров и анализируя риски водителя, чтобы рассчитать оценку риска на основе поездок, совершенных водителем.

Мы получаем данные о поездках для водителя, включая пространственную информацию в виде точек GPS. Затем мы выполняем пространственные операции, такие как нахождение состояния или почтового индекса для местоположения, с данными, а затем используем результаты для расчета оценки риска водителя.

Наш метод выполнения пространственных вычислений

Но сначала пара терминов.

  1. POSTGIS - PostGIS - это расширение системы объектно-реляционной базы данных PostgreSQL, которое позволяет хранить объекты ГИС (географических информационных систем) в базе данных. См. Документы POSTGIS.
  2. БД OSM - мы загружаем данные открытой карты улиц (OSM) в базу данных postgres и используем POSTGIS для выполнения пространственных расчетов. Мы будем ссылаться на эту базу данных postgres, содержащую данные OSM, как на базу данных OSM до конца статьи.
  3. Spark - Apache Spark - это быстрая и универсальная кластерная вычислительная система (см. Документацию Apache). В Spark вычисления выполняются с использованием исполнителей, которые являются рабочими узлами, отвечающими за выполнение отдельных задач в искровом задании.

Для моделирования рисков в Zendrive мы делаем следующие расчеты:

  • Рассчитайте расстояние поездки по шоссе.
  • Если поездка проходит по нескольким почтовым индексам, нам нравится рассчитывать пройденное расстояние по каждому почтовому индексу.
  • Прочие аналогичные пространственные расчеты.

Для этого мы используем искровые работы. Исполнители Spark устанавливают соединение с OSM DB, запускают пространственный запрос и получают результаты для дальнейшей обработки.

Ограничения OSM DB

Описанный метод пространственного вычисления имеет проблему с масштабированием. OSM DB, будучи базой данных СУБД, ограничивает количество одновременных подключений к ней. В нашем случае это число составляет около 300–350.

Выбор более крупной базы данных для увеличения количества подключений приведет к тому, что база данных будет простаивать большую часть времени, поскольку наши искровые задания выполняются по запросу. Это ограничивает количество исполнителей, используемых в нашей искровой задаче, и становится узким местом при масштабировании по требованию для наших искровых заданий.

Обработка этих пакетных заданий может занять несколько часов, и с увеличением трафика это время линейно увеличивается. Мы должны найти масштабируемый способ выполнения пространственных вычислений, а также убедиться, что время и ресурсы, необходимые для запуска нашего конвейера, не увеличиваются значительно.

Масштабирование этого подхода

Наш подход к решению проблемы заключался в устранении первопричины. Если бы мы могли удалить нашу зависимость от OSM DB, мы могли бы масштабироваться. Но как?

Наше решение было простым. Можем ли мы каким-то образом выполнять все вычисления в самом искровом исполнителе без необходимости подключаться к базе данных OSM? Ответ положительный.

Здесь на помощь приходит Shapely. Судя по их документам,

Shapely - это пакет Python под лицензией BSD для управления плоскими геометрическими объектами и их анализа.

Первая предпосылка Shapely заключается в том, что программисты Python должны иметь возможность выполнять геометрические операции типа PostGIS вне РСУБД.

Shapely помогает нам выполнять пространственные вычисления локально с использованием Python вместо использования базы данных OSM. Но поскольку Shapely не поддерживает операции индексирования, мы используем Rtree, который предоставляет ряд расширенных функций пространственного индексирования для индексирования наших пространственных данных.

Shapely принимает все данные в двухмерной координатной плоскости. Нам нужно преобразовать наши данные в систему координат, в которой Shapely может выполнять геометрические операции (например, epsg: 3857).

В нашем случае мы преобразовали данные БД OSM, представленные в epsg: 4326, в epsg: 3857, используя Pyproj (библиотека преобразования координат в Python). Вам может понадобиться подобное преобразование, если вы хотите использовать Shapely для пространственных вычислений.

Shapely - это просто инструмент для вычислений, поэтому нам нужно преобразовать геоданные, присутствующие в базе данных OSM, в объекты Shapely, над которыми можно выполнять пространственные операции. Идея заключалась в том, чтобы у каждого исполнителя были локальные геоданные, чтобы он мог выполнять вычисления независимо, и полностью удалить зависимость OSM DB.

Это вызывает еще одну проблему

Для небольших таблиц в базе данных OSM (например, состояний или почтовых индексов) мы можем напрямую преобразовать данные в объекты Shapely и загрузить их в наши искровые исполнители. Но мы не можем позволить себе полностью загружать в наши исполнители большие таблицы, такие как North America Line (содержащие данные о шоссе). Это может привести к ошибкам нехватки памяти.

Нам нужно найти способ минимизировать количество данных, загружаемых в каждый исполнитель.

Решение: разделяй и властвуй

Для решения этой проблемы мы использовали технику хранения данных, используемую для цифровых карт. Для отображения карты на экране весь мир делится на определенное количество плиток в зависимости от уровня масштабирования. Когда вы запрашиваете данные для региона, загружаются только плитки для этого региона.

Подобно этому, мы сохранили данные о шоссе для каждой плитки вместо хранения полных данных. Теперь мы можем загружать только данные тайлов, необходимые для обработки поездок в этом исполнителе, а не все данные Северной Америки.

Но простое разделение данных на плитки и загрузка только необходимых плиток для вычислений не даст никаких преимуществ. Почему?

Как показано на рисунке 5, наши данные о поездках распределяются по исполнителям искры, и каждый исполнитель должен загружать плитки, необходимые для обработки каждой поездки, присутствующей в нем. Поскольку для разных поездок в исполнителе требуются разные тайлы для обработки, мы в конечном итоге загружаем большое количество тайлов в каждый исполнитель, что приводит к одной и той же проблеме.

Поэтому мы пробовали различные методы решения этой проблемы:

1. Загрузить, обработать и стереть

В этом методе мы можем загрузить данные плитки для одной поездки, обработать, а затем стереть данные плитки для этой поездки. Таким образом, в исполнитель в любой момент будут присутствовать только данные одного тайла, и мы успешно решили проблему с загрузкой данных.
Но, как очевидно, нам нужно будет снова и снова загружать одни и те же данные плитки в одном и том же исполнителе. Кроме того, может потребоваться загрузка одной плитки несколькими исполнителями. Это увеличит время, необходимое для нашей искровой работы.

2. Интеллектуальное перераспределение данных в искре.

Хорошим методом решения проблемы было бы сгруппировать тайлы одной и той же области в один исполнитель, как показано на рисунке 6. Таким образом, каждый исполнитель загрузит данные тайлов соответствующей области, и будет загружено меньшее общее количество тайлов.

Более того, поездки в исполнителе теперь принадлежат одному региону и в конечном итоге требуют для обработки одних и тех же данных плитки, поэтому мы можем повторно использовать одни и те же данные, не загружая их несколько раз.

Для этого мы подумали о перераспределении данных о поездках на основе плиток, с которых путешествие начинается. Мы находим начальную плитку для поездок из начальной точки каждой поездки и перераспределяем данные поездки между исполнителями. Теперь поездки, относящиеся к одному региону, находятся в одном исполнителе. Мы также сохраним данные плитки, загруженные один раз, в исполнитель, чтобы их можно было повторно использовать в других поездках того же исполнителя.

Это решает проблему перегрузки памяти за счет меньшего количества поездок. Кроме того, использование одних и тех же данных плитки для обработки нескольких поездок сокращает время, затрачиваемое на загрузку одной и той же плитки несколько раз. Мы можем поразить двух зайцев одним выстрелом.

Поддержание качества структуры и удобочитаемости нашего кода

Использование этой комбинации Shapely и Rtree решает нашу проблему. Но использование этой комбинации приводит к нарушению структуры и читабельности нашего кода.

Чтобы улучшить удобочитаемость, мы подумали об использовании геопанд. Что такое геопанды? Из их документов:

GeoPandas - это проект с открытым исходным кодом, упрощающий работу с геопространственными данными в Python. GeoPandas расширяет типы данных, используемые пандами, чтобы позволить пространственные операции с геометрическими типами. Геометрические операции выполняются shapely.

Geopandas использует фреймы геоданных (очень похожие на фреймы данных pandas), что обеспечивает очень чистый способ выполнения пространственных вычислений. Геометрические операции выполняются в Shapely, поэтому нам не нужно беспокоиться о каких-либо изменениях в наших результатах. Кроме того, он имеет встроенный индекс Rtree (sindex), который можно использовать для индексации данных. Это было идеально для нас.

Ниже приведен фрагмент кода, чтобы найти состояние для любого местоположения. Индексирование и обработка данных упрощаются с помощью геопанд.

# state_gdf is the geodataframe containing state information
+------------+---------+
|state       | geometry|
+------------+---------+
def get_state_for_point(self, x, y):
    #x, y are coordinates in epsg:3857
    point_geom = Point(x, y)
    possible_matches_index = list(state_gdf.sindex.intersection(point_geom.bounds))
    possible_matches = state_gdf.loc[possible_matches_index]
    precise_matches = possible_matches.loc[possible_matches.contains(point_geom)]
    if precise_matches.empty:
        return None
    else:
        return precise_matches.iloc[0].state

Имея 160 исполнителей, Shapely и Rtree работали без сбоев, в то время как база данных OSM не могла предоставить соединения для обработки такого количества исполнителей.

Масштабируемость

Поскольку вычисления происходят локально, этот метод не имеет ограничений на количество исполнителей. Мы можем масштабировать наш искровой кластер в соответствии с нашими потребностями.

Раньше мы запускали наш конвейер с использованием 80 исполнителей. Чтобы протестировать новый подход, мы увеличили наш искровой кластер, чтобы использовать вдвое больше исполнителей. Имея 160 исполнителей, Shapely и Rtree работали без сбоев, в то время как база данных OSM не могла предоставить соединения для обработки такого количества исполнителей.

Более того, мы запустили наш конвейер с тем же количеством исполнителей, которое использовалось с OSM DB, и обнаружили, что время, затрачиваемое Shapely и Rtree, сопоставимо со временем, затрачиваемым OSM DB.

OSM DB (80 executors, 10 gb)
1486.45 sec
Shapely + Rtree (80 executors, 10 gb)
1703.88 sec
Shapely + Rtree (160 executors, 8 gb)
1057.63

Это решение также помогло нам найти правильное географическое соответствие для важных поездок, которые ранее не были сопоставлены с почтовыми индексами, а также снизило стоимость выделенной базы данных OSM.

Излишне говорить, что мы вполне довольны результатом.

Автор биографии

Ashutosh Goel is a final year student at BITS Pilani, pursuing a dual major in Computer Science and Economics. He currently works as an intern in the Customer Products team at Zendrive. 
Ashutosh likes to work on large-scale distributed systems. Python and spark are his favourite tools.

After completing his BTech from IIT Kanpur in 2012 and spending a couple of years working in the Photoshop Express at Adobe, Vishal Verma has been at Zendrive for 6 years now.
He has worked through every team and is currently leading the Customer Products team. An important aspect of his job is to make sure distributed jobs scale properly, and to have the foresight to fix things before they start to break.