Топология подсчета слов Storm — проблема концепции с количеством выполнений

Добрый день, я слежу за Storm-starter WordCountTopology здесь. Для справки, вот файлы Java.

Это основной файл:

public class WordCountTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {

public SplitSentence() {
  super("python", "splitsentence.py");
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("word"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
  return null;
}
}

public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
  String word = tuple.getString(0);
  Integer count = counts.get(word);
  if (count == null)
    count = 0;
  count++;
  counts.put(word, count);
  collector.emit(new Values(word, count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("word", "count"));
}
}

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new TextFileSpout(), 5);

builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

Config conf = new Config();
conf.setDebug(true);

if (args != null && args.length > 0) {
  conf.setNumWorkers(3);

  StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
  conf.setMaxTaskParallelism(3);
  LocalCluster cluster = new LocalCluster();
  cluster.submitTopology("word-count", conf, builder.createTopology());
  Thread.sleep(10000);
  cluster.shutdown();
}
}
}

Вместо того, чтобы читать из случайной строки [], я хотел бы только одно чтение из одного предложения:

public class TextFileSpout extends BaseRichSpout {
    SpoutOutputCollector _collector;
    String sentence = "";
    String line = "";
    String splitBy = ",";
    BufferedReader br = null;

    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        _collector = collector;

    }

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        sentence = "wordOne wordTwo";
        _collector.emit(new Values(sentence));
        System.out.println(sentence);
    }

    @Override
    public void ack(Object id) {
    }

    @Override
    public void fail(Object id) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

Этот код работает, и на выходе много потоков/испусканий. Проблема в том, что программа многократно выполняет чтение одного предложения 85 раз, а не один раз. Я предполагаю, что это потому, что исходный код несколько раз выполняет новые случайные предложения.

Что заставляет NextTuple вызываться так много раз?


person user2494663    schedule 09.06.2015    source источник
comment
не могли бы вы поделиться своим кодом носика   -  person user2720864    schedule 10.06.2015
comment
@user2720864 user2720864 Общий код носика. Прости за это   -  person user2494663    schedule 10.06.2015


Ответы (1)


Вы должны переместить код инициализации файла с помощью метода in open , иначе каждый раз, когда вызывается nextTuple, ваш обработчик файла будет инициализирован.

РЕДАКТИРОВАТЬ:

внутри открытого метода сделайте что-то вроде

    br = new BufferedReader(new FileReader(csvFileToRead));

и тогда логика чтения файла должна быть внутри метода nextTuple

     while ((line = br.readLine()) != null) {
         // your logic
     }
person user2720864    schedule 10.06.2015
comment
Я переместил инициализацию файла в open. Сгенерированное предложение исправлено со всеми словами в файле, разделенными пробелом. Однако nextTuple вызывается 86 раз, поэтому мои подсчеты в 86 раз больше, чем должны быть. Я предполагаю, что это сужает всю мою проблему до выяснения того, как заставить nextTuple вызываться только один раз. Спасибо вам большое за ваше время. - person user2494663; 10.06.2015
comment
логика чтения должна быть внутри метода nextTuple, обновил мой ответ - person user2720864; 11.06.2015
comment
Спасибо за ваш ответ. Даже если я уберу всю часть чтения файла и просто составлю предложение из одного слова, nextTuple будет вызываться повторно 85 раз. Знаете ли вы, как в этом примере Storm решает, сколько раз запускать nextTuple? Может я где-то пропустил конфигурацию. Спасибо. - person user2494663; 11.06.2015
comment
Я упростил код, чтобы прочитать только одно предложение. Я просмотрел код и не могу понять, что вызывает NextTuple. Мне нужно, чтобы носик запускался один раз и возвращал wordOne: 1 и wordTwo: 1 вместо 85 и 85. Спасибо. - person user2494663; 12.06.2015
comment
Storm предназначен для потоковых источников, которые выдают данные по мере их появления. nextTuple() вызывается в бесконечном цикле, поэтому в вашем случае ему необходимо отслеживать свою позицию в вашем источнике данных. Он также должен отслеживать вызовы ack() и fail(), если вы хотите хотя бы один раз обработать. - person Joshua Martell; 14.06.2015