Storm ›Как интегрировать обратный вызов Java в Spout

Я пытаюсь интегрировать Storm (см. Здесь) в свой проект. Я изучаю концепции топологий, желобов и болтов. Но сейчас я пытаюсь понять, как на самом деле реализовать несколько вещей.

A) У меня многоязычная среда с Java и Clojure. Мой код Java - это класс обратного вызова с методами, запускающими потоковые данные. Данные о событиях, передаваемые в эти методы, - это то, что я хочу использовать в качестве носика.

Итак, первый вопрос: как связать данные, поступающие в эти методы, с носиком? Я пытаюсь i) передать backtype.storm.topology.IRichSpout, затем ii) передать backtype.storm. spout.SpoutOutputCollector (см. здесь) к функции открытия этого носика (см. здесь). Но я не вижу способа перейти к какой-либо карте или списку.

Б) Все остальное в моем проекте - Clojure. Через эти методы будет поступать много данных. Каждое событие будет иметь идентификатор от 1 до 100. В Clojure я хочу разделить данные, поступающие из носика, на разные потоки выполнения. Думаю, это будут болты.

Как я могу настроить болт Clojure, чтобы получать данные о событиях из носика, а затем прерывать поток на основе идентификатора входящего события?

Заранее спасибо Тим

[ИЗМЕНИТЬ 1]

Я действительно справился с этой проблемой. Я закончил тем, что 1) реализовал свой собственный IRichSpout. Затем я 2) подключил внутренний кортеж этого носика к данным входящего потока в моем классе обратного вызова java. Я не уверен, что это идиоматика. Но он компилируется и работает без ошибок. Однако 3) я не вижу данных входящего потока (определенно есть), проходящих через болт printstuff.

Есть ли что-то конкретное, что я должен сделать в реализации носика или болта или определении топологии, чтобы гарантировать распространение данных о событии? Спасибо.


      ;; tie Java callbacks to a Spout that I created
      (.setSpout java-callback ibspout)

      (storm/defbolt printstuff ["word"] [tuple collector]
        (println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
      )
      (storm/topology
       { "1" (storm/spout-spec ibspout)
       }
       { "3" (storm/bolt-spec  { "1" :shuffle }
                               printstuff
             )
       })

[РЕДАКТИРОВАТЬ 2]

По совету члена SO Ankur, я перенастраиваю свою топологию. После того, как я создал свой обратный вызов Java, я передаю его кортеж в IBSpout ниже, используя (.setTuple ibspout (.getTuple java-callback)). Я не передаю весь объект обратного вызова Java, потому что получаю ошибку NotSerializable. Все компилируется и работает без ошибок. Но опять же, на мой printstuff данные не поступают. Хм.


    public class IBSpout implements IRichSpout {

      /**
       * Storm spout stuff
       */
      private SpoutOutputCollector _collector;

      private List _tuple = new ArrayList();
      public void setTuple(List tuple) { _tuple = tuple; }
      public List getTuple() { return _tuple; }

      /**
       * Storm ISpout interface functions
       */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
      }
      public void close() {}
      public void activate() {}
      public void deactivate() {}
      public void nextTuple() {
        _collector.emit(_tuple);
      }
      public void ack(Object msgId) {}
      public void fail(Object msgId) {}


      public void declareOutputFields(OutputFieldsDeclarer declarer) {}
      public java.util.Map  getComponentConfiguration() { return new HashMap(); }

    }


person Nutritioustim    schedule 02.04.2013    source источник


Ответы (2)


Кажется, что вы передаете носик своему классу обратного вызова, что кажется немного странным. Когда выполняется топология, шторм будет периодически вызывать метод spouts nextTuple, поэтому вам нужно передать обратный вызов java в вашу собственную реализацию носика, чтобы, когда шторм вызывает ваш носик, носик вызывает обратный вызов java для получения следующего набора кортежи для подачи в топологию.

Ключевой концепцией, которую следует понять, является то, что Spouts извлекает данные по запросу шторма, а вы не отправляете данные в spouts. Ваш обратный вызов не может вызывать spout для передачи в него данных, скорее ваш spout должен извлекать данные (из некоторого java-метода или любого буфера памяти), когда вызывается метод nextTuple вашего носика.

person Ankur    schedule 03.04.2013
comment
О, классно. Спасибо за понимание. Но я все еще не вижу, чтобы данные поступали через носик к моему болту. Я дал лучшее описание выше. Может мне стоит заняться чем-то конкретным со своим Носиком? Есть ли особый способ передачи структуры данных в Spout? Спасибо. - person Nutritioustim; 03.04.2013
comment
@Nutritioustim Ты получил ответ? - person hawkeye; 22.04.2013
comment
Здарова. См. Выше. Я не мог заставить шторм делать то, что хотел. А Lamina был намного легче инструмент, который решил мою проблему. HTH. - person Nutritioustim; 22.04.2013

Ответ на часть Б:

Для меня простой ответ звучит так, будто вы ищете группировку полей, чтобы вы могли контролировать, какие работы группируются во время выполнения по идентификатору.

Тем не менее, я не уверен, что это действительно полный ответ, потому что я не знаю, почему вы пытаетесь сделать это таким образом. Если вам просто нужна сбалансированная рабочая нагрузка, лучше выбрать группировку в случайном порядке.

person G Gordon Worley III    schedule 02.04.2013
comment
Эй, спасибо, что посмотрели на это. Я действительно указал : shuffle, чтобы сбалансировать рабочую нагрузку. Проблема, с которой я столкнулся сейчас, заключается в том, что я не вижу, чтобы мои данные о событии распространялись на мой болт (см. Выше). Любое понимание здесь приветствуется. - person Nutritioustim; 02.04.2013
comment
@Nutritioustim вы действительно поняли, в чем проблема? - person Vor; 17.04.2013
comment
@Vor, Нет. Storm кажется слишком неработоспособным для того, что я пытаюсь сделать. На данный момент Lamina удовлетворяет мои потребности. HTH. - person Nutritioustim; 18.04.2013
comment
Спасибо за ответ, посмотрю - person Vor; 18.04.2013