HTTP-клиент StreamSets

Я работаю с StreamSets в дистрибутиве Cloudera, пытаясь получить некоторые данные с этого веб-сайта http://files.data.gouv.fr/sirene/

Я столкнулся с некоторыми проблемами при выборе параметров как HTTP-клиента, так и места назначения Hadoop FS.

https://image.noelshack.com/fichiers/2017/44/2/1509457504-streamsets-f.jpg

Я получаю эту ошибку: HTTP_00 - Не удается разобрать запись: java.io.IOException: org.apache.commons.compress.archivers.ArchiveException: Архиватор не найден для подписи потока

Я покажу вам свою конфигурацию.

HTTP-клиент:

Общие

Имя: HTTP-клиент INSEE

Описание: Клиент HTTP SIRENE

При ошибке записи: отправить на ошибку

HTTP

URL ресурса: http://files.data.gouv.fr/sirene/

Заголовки : сирена_ : сирена_

Режим : Потоковое

Действия для каждого статуса

Код HTTP-статуса: 500 | Действие для статуса: Повторить попытку с экспоненциальной задержкой |

Базовый интервал задержки (мс): 1000 | Максимальное количество попыток: 10

HTTP-метод: ПОЛУЧИТЬ

Часовой пояс тела: UTC (UTC)

Запросить кодировку передачи: БУФЕРИРОВАНО

HTTP-сжатие: нет

Время ожидания подключения: 0

Тайм-аут чтения: 0

Тип аутентификации: нет

Используйте OAuth 2

Используй прокси

Максимальный размер пакета (записей): 1000

Время ожидания пакета (мс): 2000

Разбивка на страницы

Режим пагинации: нет

TLS

ИспользоватьTLS

Обработка времени ожидания

Действие по тайм-ауту: Немедленно повторить попытку

Максимальное количество попыток: 10

Формат данных

Формат даты: с разделителями

Формат сжатия: Архив

Шаблон имени файла в сжатом каталоге: *.csv

Тип формата разделителя: Пользовательский

Строка заголовка: с линией заголовка

Максимальная длина записи (символы): 1024

Разрешить дополнительные столбцы

Символ-разделитель: точка с запятой

Экранирующий символ : Другое \

Персонаж цитаты: Другой"

Тип корневого поля: список-карта

Строки для пропуска: 0

Разобрать NULL

Кодировка: UTF-8

Игнорировать управляющие символы

Назначение Hadoop FS:

Общие

Имя: Хадуп ФС 1

Описание: Запись в HDFS

Библиотека сцены: CDH 5.7.6

Производить события

Обязательные поля

Предварительные условия

При ошибке записи: отправить на ошибку

Выходные файлы

Тип файла: Весь файл

Префикс файлов

Каталог в шапке

Шаблон каталога: /user/pap/StreamSets/sirene/

Часовой пояс данных: UTC (UTC)

Основа времени: ${время:сейчас()}

Использовать атрибут рулона

Подтвердить разрешения HDFS: ВКЛ.

Пропустить восстановление файлов: ВКЛ.

Поздние записи

Ограничение времени поздней записи (сек): ${1 * HOURS}

Поздняя обработка записи: отправить сообщение об ошибке

Формат данных

Формат данных: весь файл

Выражение имени файла: ${record:value('/fileInfo/filename')}

Выражение разрешений: 777

Файл существует: перезаписать

Включить контрольную сумму в события

... так что я делаю неправильно? :(


person VincentM000    schedule 31.10.2017    source источник
comment
Возможно, это связано с тем, что первый файл из files.data.gouv.fr/sirene является README. .txt, а не CSV-файл в ZIP-архиве. Как игнорировать этот файл, поскольку в FTP-клиенте SFTP нет шаблона имени файла?   -  person VincentM000    schedule 31.10.2017


Ответы (2)


Похоже, http://files.data.gouv.fr/sirene/ возвращается список файлов, а не сжатый архив. Это сложный вопрос, поскольку нет стандартного способа перебора такого списка. Вы можете прочитать http://files.data.gouv.fr/sirene/ в виде текста, затем используйте оценщик Jython, чтобы проанализировать URL-адреса zip-файла, извлечь, распаковать и проанализировать их, добавив проанализированные записи в пакет. Я думаю, что у вас будут проблемы с этим методом, так как все записи окажутся в одном пакете, выбивая память.

Другой идеей может быть использование двух конвейеров: первый будет использовать источник HTTP-клиента и оценщик сценария для загрузки заархивированных файлов и записи их в локальный каталог. Затем второй конвейер будет считывать заархивированный CSV через источник каталога, как обычно.

Если вы решите попробовать, свяжитесь с сообществом StreamSets через один из наших каналов — см. https://streamsets.com/community

person metadaddy    schedule 01.11.2017

Я пишу оценщик Jython. Я не знаком с доступными константами/объектами/записями, представленными в комментариях. Я попытался адаптировать этот скрипт Python в оценщик Jython:

import re
import itertools
import urllib2
data = [re.findall(r'(sirene\w+.zip)', line) for line in open('/home/user/Desktop/filesdatatest.txt')]
data_list = filter(None, data)
data_brackets = list(itertools.chain(*data_list))
data_clean = ["http://files.data.gouv.fr/sirene/" + url for url in data_brackets]
for url in data_clean:
    urllib2.urlopen(url)

записи = [re.findall(r'(sirene\w+.zip)', запись) для записи в записях] дал мне это сообщение об ошибке SCRIPTING_05 - Ошибка сценария при обработке записи: javax.script.ScriptException: TypeError: ожидаемая строка или буфер , но попал в строку номер 50

filesdatatest.txt содержит такие вещи, как:

Listing of /v1/AUTH_6032cb4c2159474684c8df1da2e2b642/storage/sirene/  
Name    Size    Date  
../            
README.txt  2Ki     2017-10-11 03:31:57  
sirene_201612_L_M.zip   1Gi     2017-01-05 00:12:08  
sirene_2017002_E_Q.zip  444Ki   2017-01-05 00:44:58  
sirene_2017003_E_Q.zip  6Mi     2017-01-05 00:45:01  
sirene_2017004_E_Q.zip  2Mi     2017-01-05 03:37:42  
sirene_2017005_E_Q.zip  2Mi     2017-01-06 03:40:47  
sirene_2017006_E_Q.zip  2Mi     2017-01-07 05:04:04  

поэтому я знаю, как анализировать записи.

person VincentM000    schedule 02.11.2017