Разделяйте данные, поступающие из CSV, чтобы я мог обрабатывать большие патчи, а не отдельные строки.

Я только начинаю работать с Google Data Flow. Я написал простой поток, который считывает CSV-файл из облачного хранилища. Один из шагов включает вызов веб-службы для обогащения результатов. Рассматриваемый веб-сервис работает намного лучше при массовой отправке нескольких 100 запросов.

Глядя на API, я не вижу отличного способа объединить 100 элементов PCollection в одно выполнение Par.do. Затем результаты необходимо будет разделить, чтобы обработать последний шаг потока, который записывает в таблицу BigQuery.

Не уверен, что мне нужно использовать окна, это то, что я хочу. Большинство примеров работы с окнами, которые я вижу, больше ориентированы на подсчет за определенный период времени.


person Jeffrey Ellin    schedule 11.05.2015    source источник


Ответы (2)


Вы можете буферизовать элементы в локальной переменной-члене вашего DoFn и вызывать свою веб-службу, когда буфер достаточно велик, а также в finishBundle. Например:

class CallServiceFn extends DoFn<String, String> {
  private List<String> elements = new ArrayList<>();

  public void processElement(ProcessContext c) {
    elements.add(c.element());
    if (elements.size() >= MAX_CALL_SIZE) {
      for (String result : callServiceWithData(elements)) {
        c.output(result);
      }
      elements.clear();
    }
  }

  public void finishBundle(Context c) {
    for (String result : callServiceWithData(elements)) {
      c.output(result);
    }
  }
}
person danielm    schedule 11.05.2015
comment
Каков наилучший способ вернуть данные обратно в конвейер. Моя служба возвращает список результатов в виде массива, в идеале я хотел бы разделить этот набор результатов на отдельные элементы. - person Jeffrey Ellin; 12.05.2015
comment
Я отредактировал свой пост, чтобы показать результаты вывода службы. - person danielm; 12.05.2015
comment
При запуске пакетного приложения запускается ли метод DoFn.finishBundle() при достижении определенного количества записей или это жизненный цикл для всего набора данных? Я предполагаю, что вы используете finishBundle для сбора любых оставшихся записей. - person Jeffrey Ellin; 12.05.2015
comment
FinishBundle вызывается в конце каждого набора элементов. Пакеты имеют неопределенный размер, но в пакетном конвейере примерно соответствуют доле данных одного рабочего потока. - person danielm; 12.05.2015
comment
Я пытаюсь воспроизвести этот код в python, но не знаю, как правильно очистить список. Например, в process(), если я делаю self.elements = [], он работает правильно, однако del self.elements[:] приведет к странным результатам. - person sthomps; 11.04.2016
comment
del self.elements[:] (или elements.clear() в примере с java) может привести к странным результатам, если callServiceWithData сохранит дескриптор elements за пределами времени жизни этого вызова. - person robertwb; 27.01.2017

Обратите внимание, что файл GroupIntoBatches было добавлено, чтобы сделать это еще проще.

person robertwb    schedule 18.07.2017