Как остановить конвейер потоковой передачи в потоке данных в облаке Google

У меня запущен поток данных Streaming для чтения подписки PUB / SUB.

По прошествии определенного периода времени или, возможно, после обработки определенного количества данных, я хочу, чтобы конвейер остановился сам по себе. Я не хочу, чтобы мой экземпляр вычислительного движка работал бесконечно.

Когда я отменяю задание через консоль потока данных, оно отображается как неудавшееся.

Есть ли способ добиться этого? я что-то упускаю ? Или эта функция отсутствует в API.


person Bharathi    schedule 16.06.2015    source источник
comment
это почти звучит так, как будто вы не должны работать в потоковом режиме, а скорее в пакетном режиме. Какой у вас вариант использования, который вам нужно запустить в потоковом режиме?   -  person Graham Polley    schedule 16.06.2015
comment
Мне нужен потоковый режим, так как мой ввод осуществляется через PUB / SUB. Поскольку потоковая работа выполняется навсегда, я хочу ее остановить.   -  person Bharathi    schedule 16.06.2015
comment
Звучит странно, что вы решили разработать свое приложение с использованием pub / sub и средства запуска потоковой передачи, когда вы хотите, чтобы он останавливался после обработки X объема данных. Звучит как классическая партия. В любом случае я не вижу в API / SDK ничего, что могло бы отменить задание. Вы можете просто остановить / удалить виртуальные машины в рабочем пуле конвейера. Тогда это, вероятно, не удастся / отменит. Это поможет?   -  person Graham Polley    schedule 16.06.2015
comment
Мы уже рассматриваем возможность добавления варианта pub / sub источника для использования в пакетном режиме, аналогично тому, что предлагает Бхарати (читать в течение определенного времени или читать определенный объем данных) - это допустимый вариант использования, который хорошо сочетается с Идея Dataflow об объединении потоковой передачи и пакетной обработки.   -  person jkff    schedule 16.06.2015


Ответы (2)


Вы могли бы сделать что-нибудь подобное?

Pipeline pipeline = ...;
... (construct the streaming pipeline) ...
final DataflowPipelineJob job =
    DataflowPipelineRunner.fromOptions(pipelineOptions)
                          .run(pipeline);
Thread.sleep(your timeout);
job.cancel();
person jkff    schedule 16.06.2015
comment
Ах, вот как вы это отмените. Я наивно пытался найти cancel () в классе Pipeline. Хорошо знать. - person Graham Polley; 17.06.2015
comment
Не могли бы вы рассказать мне разницу между Pipeline.run () и DataflowPipelineRunner.fromOptions (pipelineOptions) .run (pipeline); Есть ли между ними разница. - person Bharathi; 23.06.2015
comment
Конвейеры потока данных могут запускаться с разными бегунами, используя Pipeline.setRunner - например, с DirectPipelineRunner, [Blocking] DataflowPipelineRunner, и в настоящее время существуют бегуны на Spark и Flink. У разных бегунов разные возможности. Если вы просто хотите запустить конвейер, вызовите pipeline.run (). Если вам нужны специфичные для бегуна возможности (например, DataflowPipelineRunner может отменять конвейеры), настройте / вызовите бегун напрямую, как в этом примере. - person jkff; 23.06.2015
comment
Это решение невозможно с новым SDK 2.x, потому что API изменился. В дополнение к этому режим потоковой передачи кажется обязательным для источника Pub / Sub в этой версии SDK. - person Thierry Falvo; 12.09.2018

Мне удалось слить (отменить задание без потери данных) запущенное задание потоковой передачи в потоке данных с помощью Rest API.

См. мой ответ

Используйте метод Rest Обновить с этим телом :

{"requiredState": "JOB_STATE_DRAINING"}

person Thierry Falvo    schedule 12.06.2018