Я пытаюсь интегрировать Storm (см. Здесь) в свой проект. Я изучаю концепции топологий, желобов и болтов. Но сейчас я пытаюсь понять, как на самом деле реализовать несколько вещей.
A) У меня многоязычная среда с Java и Clojure. Мой код Java - это класс обратного вызова с методами, запускающими потоковые данные. Данные о событиях, передаваемые в эти методы, - это то, что я хочу использовать в качестве носика.
Итак, первый вопрос: как связать данные, поступающие в эти методы, с носиком? Я пытаюсь i) передать backtype.storm.topology.IRichSpout, затем ii) передать backtype.storm. spout.SpoutOutputCollector (см. здесь) к функции открытия этого носика (см. здесь). Но я не вижу способа перейти к какой-либо карте или списку.
Б) Все остальное в моем проекте - Clojure. Через эти методы будет поступать много данных. Каждое событие будет иметь идентификатор от 1 до 100. В Clojure я хочу разделить данные, поступающие из носика, на разные потоки выполнения. Думаю, это будут болты.
Как я могу настроить болт Clojure, чтобы получать данные о событиях из носика, а затем прерывать поток на основе идентификатора входящего события?
Заранее спасибо Тим
[ИЗМЕНИТЬ 1]
Я действительно справился с этой проблемой. Я закончил тем, что 1) реализовал свой собственный IRichSpout. Затем я 2) подключил внутренний кортеж этого носика к данным входящего потока в моем классе обратного вызова java. Я не уверен, что это идиоматика. Но он компилируется и работает без ошибок. Однако 3) я не вижу данных входящего потока (определенно есть), проходящих через болт printstuff.
Есть ли что-то конкретное, что я должен сделать в реализации носика или болта или определении топологии, чтобы гарантировать распространение данных о событии? Спасибо.
;; tie Java callbacks to a Spout that I created (.setSpout java-callback ibspout) (storm/defbolt printstuff ["word"] [tuple collector] (println (str "printstuff --> tuple["tuple"] > collector["collector"]")) ) (storm/topology { "1" (storm/spout-spec ibspout) } { "3" (storm/bolt-spec { "1" :shuffle } printstuff ) })
[РЕДАКТИРОВАТЬ 2]
По совету члена SO Ankur, я перенастраиваю свою топологию. После того, как я создал свой обратный вызов Java, я передаю его кортеж в IBSpout ниже, используя (.setTuple ibspout (.getTuple java-callback))
. Я не передаю весь объект обратного вызова Java, потому что получаю ошибку NotSerializable. Все компилируется и работает без ошибок. Но опять же, на мой printstuff данные не поступают. Хм.
public class IBSpout implements IRichSpout { /** * Storm spout stuff */ private SpoutOutputCollector _collector; private List _tuple = new ArrayList(); public void setTuple(List tuple) { _tuple = tuple; } public List getTuple() { return _tuple; } /** * Storm ISpout interface functions */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() {} public void activate() {} public void deactivate() {} public void nextTuple() { _collector.emit(_tuple); } public void ack(Object msgId) {} public void fail(Object msgId) {} public void declareOutputFields(OutputFieldsDeclarer declarer) {} public java.util.Map getComponentConfiguration() { return new HashMap(); } }