В части 1 этого блога я использую Google Cloud Platform (GCP) для создания конвейера данных с использованием Pub/Sub, Dataflow и Bigquery для автоматического мониторинга и хранения данных о прокате велосипедов TFL и погоде. Во второй части я буду использовать эти данные, чтобы разработать модель для прогнозирования использования велосипеда на основе прогнозов погоды и ежедневных/еженедельных временных трендов.
TFL (Транспорт для Лондона) предоставляет открытые API для различных типов транспортных данных. Я посмотрел на Bikepointконечную точку, которая предоставляет информацию о схеме проката велосипедов TFL, а также данные из API погоды (Powered by Dark Sky). Используя их, я создал конвейер данных, который будет обрабатывать и сохранять эти данные каждые десять минут в таблицу Big Query. Каждая строка представляет одну из 787 велосипедных станций в определенный момент времени и содержит информацию о количестве велосипедов, используемых в настоящее время, а также о текущих погодных условиях. Некоторые строки из конечного результата показаны здесь:
Данные
Ответ от Bikepoint API имеетформат json с указанием узла для каждой из 787 док-станций. Я извлекаю название дока, код, общее количество доков, количество доступных велосипедов и количество пустых доков. Я собрал переменные, которые, как я думал, повлияют на популярность проката велосипедов из API погоды, который также был в формате json: описание погоды, температура, облачность, влажность, интенсивность осадков, скорость ветра и видимость. Я также создал метку времени для каждой строки данных.
Конвейер данных
Сценарий Python, работающий на экземпляре Google Cloud Compute f1-micro (1 виртуальный ЦП, 0,6 ГБ памяти), извлекает данные из API Bikepoint и погоды каждые десять минут. Данные извлекаются и отправляются в виде строк в службу обмена сообщениями Cloud Pub/Sub. Затем Cloud Dataflow использует данные из Pub/Sub, обрабатывает их и передает в таблицу Big Query.
- Cloud/Pub Sub предоставляет надежную службу обмена сообщениями, которая разделяет отправителей и получателей.
- Cloud Dataflow обеспечивает автоматическое масштабирование пакетной и потоковой обработки данных с помощью Apache Beam SDK.
- Big Query — это бессерверное хранилище данных, позволяющее анализировать данные с помощью SQL-запросов.
Настройка таблицы BigQuery
Я создал таблицу BigQuery через консоль GCP с соответствующей схемой, используя тип данных TIMESTAMP, где это возможно, чтобы упростить анализ позже. Подробнее о создании таблиц BigQuery можно прочитать здесь.
Получение данных из API
Библиотека Python requests использовалась для получения данных из API путем запуска скрипта на микроэкземпляре Compute. Полученные jsons были преобразованы в набор списков и словарей. Вот первая часть скрипта Python для получения данных из API Bikepoint и Weather:
import requests import datetime import time from google.cloud import pubsub import os os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/philipmattocks/serviceaccount.json' colNames=["modifiedTime","callTime","_id","url","commonName","lat","lon","NbBikes","NbEmptyDocks","NbDocks","Weather_readingtime","Weather_summary","Wather_temperature","Weather_cloudCover","Weather_humidity","Weather_precipIntensity","Weather_windSpeed","Weather_visibility"] #Get data from TFL bikepoint API url = 'https://api.tfl.gov.uk/bikepoint' r = requests.get(url) data = r.json() #Get data from Darksky weather API url_weather = 'https://api.darksky.net/forecast/**API_KEY_REDACTED**/51.51,-0.13?units=si' r_weather = requests.get(url_weather) data_weather = r_weather.json()
Переменная data содержит список словарей, по одному для каждой из 787 док-станций, с такой информацией, как широта, долгота, общее количество доков, количество доступных велосипедов и количество пустых доков. Переменная data_weather — это словарь, содержащий данные о погоде.
Обработка данных и получение строк
Исходная переменная data была извлечена в 10 списков, по одному на каждую строку в таблице. Каждый из этих списков содержал 787 элементов, по одному на каждый док. По погодным условиям для каждого ряда были созданы соответствующие списки одинаковой длины:
#Get values from json modifiedTime=[d['additionalProperties'][1]['modified'] for d in data] callTime=[str(datetime.datetime.now())]*len(modifiedTime) lat=[d['lat'] for d in data] lon=[d['lon'] for d in data] _id=[d['id'] for d in data] url=[d['url'] for d in data] commonName=[d['commonName'] for d in data] NbBikes=[d['additionalProperties'][6]['value'] for d in data] NbEmptyDocks=[d['additionalProperties'][7]['value'] for d in data] NbDocks=[d['additionalProperties'][8]['value'] for d in data] #Weather stats for each row Weather_readingtime=[time.strftime("%Y-%m-%dT%H:%M:%S%Z", time.localtime(data_weather['currently']['time']))]*len(modifiedTime) Weather_summary=[data_weather['currently']['icon']]*len(modifiedTime) Weather_temperature=[str(data_weather['currently']['temperature'])]*len(modifiedTime) Weather_cloudCover=[str(data_weather['currently']['cloudCover'])]*len(modifiedTime) Weather_humidity=[str(data_weather['currently']['humidity'])]*len(modifiedTime) Weather_precipIntensity=[str(data_weather['currently']['precipIntensity'])]*len(modifiedTime) Weather_windSpeed=[str(data_weather['currently']['windSpeed'])]*len(modifiedTime) Weather_visibility=[str(data_weather['currently']['visibility'])]*len(modifiedTime)
Списки объединяются в 787 строк с помощью функции zip. Каждая строка представляет собой список словарей, который затем преобразуется в буквальную строку для отправки через Pub/Sub и облегчения последующей обработки. Каждая строка отправляется в Pub/Sub через клиентскую библиотеку Google Cloud Pub/Sub API:
#Send data to PubSub rows=list(zip(modifiedTime,callTime,_id,url,commonName,lat,lon,NbBikes,NbEmptyDocks,NbDocks,Weather_readingtime,Weather_summary,Weather_temperature,Weather_cloudCover,Weather_humidity,Weather_precipIntensity,Weather_windSpeed,Weather_visibility)) project_id = "tflbikeuse" topic_name = "test3" publisher = pubsub.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) #for event_data in rows: for i in range(0,len(rows)): publisher.publish(topic_path,data=str({k:v for k,v in zip(colNames,list(rows[i]))}).encode('utf-8') )
Ранее я создал тему Pub/Sub test3 в консоли Pub/Sub, подробности об этом см. здесь.
Сценарий Python был настроен на запуск по расписанию crontab каждые десять минут:
*/10 * * * * python3 /home/philipmattocks/tflbikestobigquery/APIs_PubSub.py
Использование Dataflow для извлечения из Pub/Sub и отправки в BigQuery
Чтобы убедиться, что Pub/Sub получает сообщения от API, я создал тестовую подписку на тему в пользовательском интерфейсе Pub/Sub и использовал следующую команду из Cloud Shell:
gcloud pubsub subscriptions pull --auto-ack mySub1 --limit=1
Это создало таблицу, содержащую строку данных, представленную в формате словаря Python.
Чтобы получать и обрабатывать сообщения с помощью Dataflow, я написал простой скрипт на Python, в котором использовался Apache Beam Python SDK. Скрипт получает строки данных из Pub/Sub, преобразует их в словари Python и сохраняет в BigQuery с помощью модуля apache_beam.io.gcp.bigquery. Полностью сценарий показан здесь:
#!/usr/bin/env python import apache_beam as beam import os import sys import ast os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/philipmattocks/serviceaccount.json' PROJECT='tflbikeuse' BUCKET='tflbikeuse' def convert_to_dict(datastring): datadict=ast.literal_eval(datastring) #print('converting to dict') yield datadict def run(): argv = [ '--project={0}'.format(PROJECT), '--job_name=frompubsubtobq', '--save_main_session', '--staging_location=gs://{0}/staging/'.format(BUCKET), '--temp_location=gs://{0}/staging/'.format(BUCKET), '--runner=DataflowRunner', '--streaming' ] p = beam.Pipeline(argv=argv) output_prefix = 'tflbikes' (p | 'getfromPubSub' >> beam.io.gcp.pubsub.ReadFromPubSub(subscription='projects/tflbikeuse/subscriptions/mySub1') | 'convert_to_dict' >> beam.FlatMap(lambda line: convert_to_dict(line) ) | 'write_to_BQ' >> beam.io.gcp.bigquery.WriteToBigQuery('tflbikeuse:TFLBikes.test2') ) p.run() if __name__ == '__main__': run()
Для аутентификации в BigQuery я создал файл ключа сервисной учетной записи с доступом к проекту GCP, подробности см. здесь. Обычно это было бы установлено как переменная среды через оболочку bash, однако я обнаружил, что к ней нельзя получить доступ при запуске моего скрипта через crontab, поэтому я явно установил ее в самом скрипте, используя os.environ (см. приведенный выше код- блокировать).
Я запустил скрипт локально для тестирования перед запуском в облаке. При работе в облаке консоль Dataflow показывает каждый из шагов вместе с другой информацией:
Результаты
Теперь у меня есть таблица, содержащая информацию о прокате велосипедов для каждой док-станции в определенное время, а также текущие погодные условия:
Во второй части я попытаюсь создать модель, которая будет прогнозировать использование велосипеда в зависимости от погодных условий, времени суток, дня недели и т. д.