Создание носика Apache Storm, который выдает кортежи каждые X секунд

У меня есть топология, которая получает данные от брокера MQTT, и я хочу, чтобы носик вел себя следующим образом:

  1. Выдавать пакет кортежей (или список строк в одном кортеже) каждые x секунд. Как мне этого добиться? Я немного читал о Storm Trident, но его IBatchSpout, похоже, не позволяет мне генерировать кортежи в пакетном режиме с определенным интервалом времени.

  2. Что делать носителю, если новые данные не поступают? Он не может заблокировать поток, так как это основной поток Storm, верно?


person touchaponk    schedule 27.10.2014    source источник


Ответы (2)


Вы можете реализовать свой собственный носик MQTT. В качестве примера взгляните на МонгоСпаут.

Важной частью является метод nextTuple.

При вызове этого метода Storm запрашивает, чтобы Spout выдавал кортежи выходному сборщику. Этот метод должен быть неблокирующим, поэтому, если у Spout нет кортежей для отправки, этот метод должен возвращаться. nextTuple, ack и fail вызываются в тесном цикле в одном потоке в spout. задача. Когда нет кортежей для генерации, было бы вежливо оставить nextTuple бездействующим в течение короткого промежутка времени (например, одну миллисекунду), чтобы не тратить слишком много ресурсов ЦП.

Вы не должны ждать указанное время сразу, но вы можете реализовать nextTuple так, чтобы он выдавал кортеж только время от времени.

private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;

@Override
public void nextTuple() {
    if (lastEmission == null ||
            lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
        List<Object> tuple = pollMQTT();
        if (tuple != null) {
            this.collector.emit(tuple);
            return;
        }
    }
    Utils.sleep(50);
}

Обратите внимание, что я нашел носитель MQTT с открытым исходным кодом. Он не выглядит готовым к производству, но вы можете использовать его в качестве отправной точки.

person Christian Strempfer    schedule 28.10.2014

Помимо Кристиана, я нашел эту реализацию для Storm MQTT-клиент. Предыдущая упомянутая ссылка до сих пор не разработана.

person Zack S    schedule 20.01.2015