В части 1 этого блога я использую Google Cloud Platform (GCP) для создания конвейера данных с использованием Pub/Sub, Dataflow и Bigquery для автоматического мониторинга и хранения данных о прокате велосипедов TFL и погоде. Во второй части я буду использовать эти данные, чтобы разработать модель для прогнозирования использования велосипеда на основе прогнозов погоды и ежедневных/еженедельных временных трендов.

TFL (Транспорт для Лондона) предоставляет открытые API для различных типов транспортных данных. Я посмотрел на Bikepointконечную точку, которая предоставляет информацию о схеме проката велосипедов TFL, а также данные из API погоды (Powered by Dark Sky). Используя их, я создал конвейер данных, который будет обрабатывать и сохранять эти данные каждые десять минут в таблицу Big Query. Каждая строка представляет одну из 787 велосипедных станций в определенный момент времени и содержит информацию о количестве велосипедов, используемых в настоящее время, а также о текущих погодных условиях. Некоторые строки из конечного результата показаны здесь:

Данные

Ответ от Bikepoint API имеетформат json с указанием узла для каждой из 787 док-станций. Я извлекаю название дока, код, общее количество доков, количество доступных велосипедов и количество пустых доков. Я собрал переменные, которые, как я думал, повлияют на популярность проката велосипедов из API погоды, который также был в формате json: описание погоды, температура, облачность, влажность, интенсивность осадков, скорость ветра и видимость. Я также создал метку времени для каждой строки данных.

Конвейер данных

Сценарий Python, работающий на экземпляре Google Cloud Compute f1-micro (1 виртуальный ЦП, 0,6 ГБ памяти), извлекает данные из API Bikepoint и погоды каждые десять минут. Данные извлекаются и отправляются в виде строк в службу обмена сообщениями Cloud Pub/Sub. Затем Cloud Dataflow использует данные из Pub/Sub, обрабатывает их и передает в таблицу Big Query.

  • Cloud/Pub Sub предоставляет надежную службу обмена сообщениями, которая разделяет отправителей и получателей.
  • Cloud Dataflow обеспечивает автоматическое масштабирование пакетной и потоковой обработки данных с помощью Apache Beam SDK.
  • Big Query — это бессерверное хранилище данных, позволяющее анализировать данные с помощью SQL-запросов.

Настройка таблицы BigQuery

Я создал таблицу BigQuery через консоль GCP с соответствующей схемой, используя тип данных TIMESTAMP, где это возможно, чтобы упростить анализ позже. Подробнее о создании таблиц BigQuery можно прочитать здесь.

Получение данных из API

Библиотека Python requests использовалась для получения данных из API путем запуска скрипта на микроэкземпляре Compute. Полученные jsons были преобразованы в набор списков и словарей. Вот первая часть скрипта Python для получения данных из API Bikepoint и Weather:

import requests
import datetime
import time
from google.cloud import pubsub
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/philipmattocks/serviceaccount.json'
colNames=["modifiedTime","callTime","_id","url","commonName","lat","lon","NbBikes","NbEmptyDocks","NbDocks","Weather_readingtime","Weather_summary","Wather_temperature","Weather_cloudCover","Weather_humidity","Weather_precipIntensity","Weather_windSpeed","Weather_visibility"]
#Get data from TFL bikepoint API
url = 'https://api.tfl.gov.uk/bikepoint'
r = requests.get(url)
data = r.json()
#Get data from Darksky weather API 
url_weather = 'https://api.darksky.net/forecast/**API_KEY_REDACTED**/51.51,-0.13?units=si'
r_weather = requests.get(url_weather)
data_weather = r_weather.json()

Переменная data содержит список словарей, по одному для каждой из 787 док-станций, с такой информацией, как широта, долгота, общее количество доков, количество доступных велосипедов и количество пустых доков. Переменная data_weather — это словарь, содержащий данные о погоде.

Обработка данных и получение строк

Исходная переменная data была извлечена в 10 списков, по одному на каждую строку в таблице. Каждый из этих списков содержал 787 элементов, по одному на каждый док. По погодным условиям для каждого ряда были созданы соответствующие списки одинаковой длины:

#Get values from json
modifiedTime=[d['additionalProperties'][1]['modified'] for d in data]
callTime=[str(datetime.datetime.now())]*len(modifiedTime)
lat=[d['lat'] for d in data]
lon=[d['lon'] for d in data]
_id=[d['id'] for d in data]
url=[d['url'] for d in data]
commonName=[d['commonName'] for d in data]
NbBikes=[d['additionalProperties'][6]['value'] for d in data]
NbEmptyDocks=[d['additionalProperties'][7]['value'] for d in data]
NbDocks=[d['additionalProperties'][8]['value'] for d in data]
#Weather stats for each row
Weather_readingtime=[time.strftime("%Y-%m-%dT%H:%M:%S%Z", time.localtime(data_weather['currently']['time']))]*len(modifiedTime)
Weather_summary=[data_weather['currently']['icon']]*len(modifiedTime)
Weather_temperature=[str(data_weather['currently']['temperature'])]*len(modifiedTime)
Weather_cloudCover=[str(data_weather['currently']['cloudCover'])]*len(modifiedTime)
Weather_humidity=[str(data_weather['currently']['humidity'])]*len(modifiedTime)
Weather_precipIntensity=[str(data_weather['currently']['precipIntensity'])]*len(modifiedTime)
Weather_windSpeed=[str(data_weather['currently']['windSpeed'])]*len(modifiedTime)
Weather_visibility=[str(data_weather['currently']['visibility'])]*len(modifiedTime)

Списки объединяются в 787 строк с помощью функции zip. Каждая строка представляет собой список словарей, который затем преобразуется в буквальную строку для отправки через Pub/Sub и облегчения последующей обработки. Каждая строка отправляется в Pub/Sub через клиентскую библиотеку Google Cloud Pub/Sub API:

#Send data to PubSub
rows=list(zip(modifiedTime,callTime,_id,url,commonName,lat,lon,NbBikes,NbEmptyDocks,NbDocks,Weather_readingtime,Weather_summary,Weather_temperature,Weather_cloudCover,Weather_humidity,Weather_precipIntensity,Weather_windSpeed,Weather_visibility))
project_id = "tflbikeuse"
topic_name = "test3"
publisher = pubsub.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
#for event_data in rows:
for i in range(0,len(rows)):
    publisher.publish(topic_path,data=str({k:v for k,v in zip(colNames,list(rows[i]))}).encode('utf-8') )

Ранее я создал тему Pub/Sub test3 в консоли Pub/Sub, подробности об этом см. здесь.

Сценарий Python был настроен на запуск по расписанию crontab каждые десять минут:

*/10 * * * * python3 /home/philipmattocks/tflbikestobigquery/APIs_PubSub.py

Использование Dataflow для извлечения из Pub/Sub и отправки в BigQuery

Чтобы убедиться, что Pub/Sub получает сообщения от API, я создал тестовую подписку на тему в пользовательском интерфейсе Pub/Sub и использовал следующую команду из Cloud Shell:

gcloud pubsub subscriptions pull --auto-ack mySub1 --limit=1

Это создало таблицу, содержащую строку данных, представленную в формате словаря Python.

Чтобы получать и обрабатывать сообщения с помощью Dataflow, я написал простой скрипт на Python, в котором использовался Apache Beam Python SDK. Скрипт получает строки данных из Pub/Sub, преобразует их в словари Python и сохраняет в BigQuery с помощью модуля apache_beam.io.gcp.bigquery. Полностью сценарий показан здесь:

#!/usr/bin/env python
import apache_beam as beam
import os
import sys
import ast
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/philipmattocks/serviceaccount.json'
PROJECT='tflbikeuse'
BUCKET='tflbikeuse'
def convert_to_dict(datastring):
        datadict=ast.literal_eval(datastring)
        #print('converting to dict')
        yield datadict
def run():
    argv = [
    '--project={0}'.format(PROJECT),
    '--job_name=frompubsubtobq',
    '--save_main_session',
    '--staging_location=gs://{0}/staging/'.format(BUCKET),
    '--temp_location=gs://{0}/staging/'.format(BUCKET),
    '--runner=DataflowRunner',
    '--streaming'
    ]
    p = beam.Pipeline(argv=argv)
    output_prefix = 'tflbikes'
    (p
        | 'getfromPubSub' >> beam.io.gcp.pubsub.ReadFromPubSub(subscription='projects/tflbikeuse/subscriptions/mySub1')
        | 'convert_to_dict' >> beam.FlatMap(lambda line: convert_to_dict(line) )
        | 'write_to_BQ' >> beam.io.gcp.bigquery.WriteToBigQuery('tflbikeuse:TFLBikes.test2')
    )
    p.run()
if __name__ == '__main__':
    run()

Для аутентификации в BigQuery я создал файл ключа сервисной учетной записи с доступом к проекту GCP, подробности см. здесь. Обычно это было бы установлено как переменная среды через оболочку bash, однако я обнаружил, что к ней нельзя получить доступ при запуске моего скрипта через crontab, поэтому я явно установил ее в самом скрипте, используя os.environ (см. приведенный выше код- блокировать).

Я запустил скрипт локально для тестирования перед запуском в облаке. При работе в облаке консоль Dataflow показывает каждый из шагов вместе с другой информацией:

Результаты

Теперь у меня есть таблица, содержащая информацию о прокате велосипедов для каждой док-станции в определенное время, а также текущие погодные условия:

Во второй части я попытаюсь создать модель, которая будет прогнозировать использование велосипеда в зависимости от погодных условий, времени суток, дня недели и т. д.