Перенести ваши данные из Cloud Storage или Campaign Manager в BigQuery очень просто, поскольку BigQuery поддерживает их. Но что, если вы хотите перенести свои данные, например, из Общий диск Google?

В этой статье я постараюсь показать вам, как перенести данные в BQ из:

  • Google Storage (и почему это просто)
  • Google Shared Drive (и почему это сложнее) и, наконец,
  • Общий диск Google с особыми требованиями при загрузке (например, файлы с необычной кодировкой)

Применение: переместите файлы CSV из хранилища в Big Query.

Что нужно сделать:
→ переместить файлы CSV, находящиеся в папках с годами.

→ пропустить некоторые ненужные таблицы, передав список tables_to_skip List
→ проверить, совпадает ли количество строк в файле с загруженным в BQ

Пара более важных частей кода:

  1. Создайте конфигурацию задания загрузки BQ (ваши параметры для правильного приема данных из хранилища)

2. Загрузить данные из хранилища

3. Загрузите данные в Big Query

Весь код вы можете найти в моем репозитории GitHub:
(не забудьте добавить .env файл с путем к вашим учетным данным Google JSON — GOOGLE_APPLICATION_CREDENTIALS )

Как я уже говорил вам → легко-легко.

Проблема заключается в том, что вы хотите переместить свои данные, например. Общий диск Google, который «в настоящее время не поддерживается»:

Вы можете запросить файл на Диске как внешнюю таблицу в BQ:

CREATE TABLE my_dataset.my_non_external_table 
AS SELECT * FROM my_dataset.my_external_table


Но в таком случае у вас нет большого контроля над форматом данных при загрузке — например, у меня была проблема с columnID. Этот столбец выглядит как INT 067872, но это идентификатор транзакции — это должна быть строка. К сожалению, BQ обрабатывает его как INT и ест 0 в начале. Вы можете явно указать схему, но после (при загрузке в таблицу BQ), а не перед загрузкой.

Сначала я попробовал, как описано в этой записи блога, предложенной в ответе на мой вопрос:

Но возникла вторая проблема — BQ позволяет загружать df только в двух кодировках:
→ UTF-8
→ ISO-8859–1

Один из файлов, которые мне нужно передать, закодирован в CP1250.

Поэтому, если у вас есть правильная кодировка, вы можете загрузить ее как внешнюю таблицу и пропустить оставшуюся часть статьи. Если вы хотите иметь полный контроль над загрузкой данных, пожалуйста, продолжайте.

Итак, я положил новый вопрос в стек, а тем временем решил:

Мне нужно создать свой собственный разъем Drive → BQ.

Пример использования: переместите в Big Query все CSV-файлы, загруженные на общий диск Google.

предположения:
→ только файлы, загруженные (или измененные) со вчерашнего дня
→ только файлы в заданном списке папок
→ без «выброшенных» файлов (то, что не так очевидно — «мусорный» файл все еще может быть загружен вчера и находиться в отслеживаемой папке)
→ схема данных CSV и загруженных файлов в BQ всегда должна быть одинаковой

Я рекомендую вам ознакомиться с Introduction to Google Drive API.

  1. Во-первых, вам нужно подготовить файл конфигурации. Какие папки вы хотите отслеживать (все_папки) и схема ваших CSV-файлов:

Вы будете использовать его при загрузке CSV с Диска в фрейм данных Pandas, поэтому любые конфигурации, такие как encoding, skiprows, parse_dates, dayfirst, dtype , разделитель или десятичный более чем приветствуются.

2. Затем немного инициализации. Получите учетные данные (с областью действия для Диска и BigQuery) на основе GOOGLE_APPLICATION_CREDENTIALS для подключения к API Диска и BQ. Создайте Python-представление API (self.service) для Drive, клиента BQ и инициализируйте класс SendMail для отправки оповещений по электронной почте (мое решение, но вы, очевидно, можете использовать любой способ оповещения) о завершенные загрузки и ошибки.

3. Получить идентификатор общего диска на основе заданной строки (self.shared_drive_name) с помощью Поиск файлов и папок. Нужно будет искать файлы.

4. Получить идентификаторы папок — нужно будет выбрать файлы только из нужных папок.
Поля основаны на Справочнике файлов.

5. Создайте словарь с идентификатором папки (с диска) для сопоставления имени папки. Это будет выглядеть так:

folders_dict = {'folder_id': 'folder_name'}

6. Получить идентификаторы предметов. Создать поисковый файловый запрос только для файлов, загруженных/измененных со вчерашнего дня, только CSV, и есть, например, возможность искать только файлы в определенной папке (передавая folder_id)

7. Затем перебрать все найденные файлы.

Проверьте, не попал ли файл в корзину, проверьте, находится ли папка (родительская) в нашем списке разыскиваемых папок. Если запрос будет более строгим, чем мой, такую ​​проверку можно пропустить. Например, вы можете определить идентификатор родителя, а затем проверять, есть ли у файла правильный родитель, не нужно. Я хочу перебрать почти все файлы на Диске, поэтому для меня это лучший вариант.

Затем замените пробелы в именах папок на «_» (BQ не позволяет создавать таблицы с пробелами), проверьте, не загружен ли уже CSV в BQ, а затем начните процесс загрузки → на основе file_id. В конце отправьте письмо с подтверждением обо всех успешно загруженных файлах.

8. Затем нам нужно загрузить CSV в кадр данных pandas на основе file_id.

Загрузить файл, хранящийся на Google Диске, затем передать параметры, созданные на основе файла конфигурации.

9. Затем некоторая очистка (подготовка) фрейма данных

Здесь важно то, что иногда значения с плавающей запятой сохраняются как строка, с запятыми вместо точек и с пробелами (например, 7 931,29) → потом сложно загрузить как с плавающей запятой. Нам нужно загрузить его как строку, а затем преобразовать в значения с плавающей запятой. Кроме того, я добавляю текущую временную метку (чтобы узнать, когда файл был загружен в BQ) и имя_файла, чтобы проверить, загружен ли этот файл уже в BQ. Вы должны сделать свой очищающий фрейм данных здесь.

10. Затем создайте схему для загрузки таблицы на основе файла конфигурации.

Нам нужно сопоставить типы python с BQ и обработать столбцы даты. Затем на основе типов, переданных при загрузке фрейма данных, мы создаем схему для загрузки таблицы в Big Query.

10. Затем загрузите его в Big Query.

Будьте добры к BQ API — не отправляйте слишком много запросов. Поэтому реализован счетчик. Конфигурация задания BQ такая же, как и в Storage Connector.

И вуаля!

Я знаю, что в моем коде есть место для оптимизации, просто хотел показать вам путь. Надеюсь, вы будете использовать часть этого со своим самодельным коннектором, и, возможно, я сэкономлю вам немного времени :)

Репозиторий GitHub со всем кодом.

Ваше здоровье!