Перебалансировка топологии Storm с использованием кода Java

Я пытаюсь перебалансировать свою топологию Storm, в которой используется KafkaSpout. Мой код:

    TopologyBuilder builder = new TopologyBuilder();
    Properties kafkaProps = new Properties();
    kafkaProps.put("zk.connect", "localhost:2181");
    kafkaProps.put("zk.connectiontimeout.ms", "1000000");
    kafkaProps.put("groupid", "storm");

    builder.setSpout( "kafkaSpout" , new KafkaSpout(kafkaProps, "test"), 3);
    builder.setBolt( "eventBolt", new EventBolt(), 2 ).shuffleGrouping( "kafkaSpout", "eventStream" );
    builder.setBolt( "tableBolt", new TableBolt(), 2 ).shuffleGrouping( "kafkaSpout", "tableStream");

    Map<String, Object> conf = new HashMap<String, Object>();
    conf.put(Config.TOPOLOGY_DEBUG, true);

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", conf, builder.createTopology());

    Utils.sleep( 1000*5 );

    List<TopologySummary> topologySummaries = cluster.getClusterInfo().get_topologies();
    for ( TopologySummary summary : topologySummaries ) {
        StormTopology topology = cluster.getTopology( summary.get_id() );
        RebalanceOptions options = new RebalanceOptions();
        options.set_wait_secs( 0 );
        options.set_num_workers( 4 );

        for ( String name : topology.get_bolts().keySet() ) {
            System.err.println( name + "   " + topology.get_bolts().get(name).get_common().get_json_conf() );
            options.put_to_num_executors( name , 5);
        }
        for ( String name : topology.get_spouts().keySet() ) {
            System.err.println( name + "   " + topology.get_spouts().get(name).get_common().get_json_conf() );
            options.put_to_num_executors( name , 5);
        }

        cluster.rebalance( summary.get_name() , options);
    }

Однако во время повторной балансировки отображается следующая трассировка ошибки:

10341 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO  kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 begin rebalancing consumer storm_rishabh-1361473654345-95461d10 try #1
10341 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 begin rebalancing consumer storm_rishabh-1361473654345-3b26ed76 try #1
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 error during syncedRebalance
java.lang.NullPointerException: null
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na]
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na]
10342 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 error during syncedRebalance
java.lang.NullPointerException: null
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na]
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na]
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO  kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 stopping watcher executor thread for consumer storm_rishabh-1361473654345-95461d10
10343 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO  kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 stopping watcher executor thread for consumer storm_rishabh-1361473654345-3b26ed76

Может кто-нибудь, пожалуйста, скажите мне, в чем может быть проблема? Нужно ли мне определять что-то еще в kafkaSpout, чтобы во время перебалансировки он правильно закрывался, а затем запускался снова?


person Rishabh    schedule 21.02.2013    source источник


Ответы (2)


У меня была такая же проблема при работе в LocalCluster (для целей разработки). Я изменил свою тестовую конфигурацию YAML, чтобы уменьшить количество рабочих процессов до 1:

topology.workers: 1

Это исправило проблему. Я еще не пытался запустить это на реальном распределенном кластере, поэтому я не знаю, является ли это просто артефактом работы в режиме LocalCluster или нет.

(В моем коде я никогда не вызываю LocalCluster.rebalance.)

person cfeduke    schedule 26.03.2013

Используйте команду storm rebalance от супервизора или узла nimbus.

Например, мифология перебалансировки шторма -n 5 -e blue-spout=3 -e yellow-bolt=10.

Пожалуйста, обратитесь к этому сайту. www.michael- noll.com.

person Prabhu    schedule 28.11.2015