在创建Storm的Topology时,我们通常使用如下代码:
builder.setBolt("cpp",new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name);
Config conf = new Config();
conf.setNumWorkers(3);
参数1:bolt名称 "cpp"
参数2:bolt类型 CppBolt
参数3:bolt的并行数,parallelismNum,即运行topology时,该bolt的线程数
setNumTasks() 设置bolt的task数
noneGrouping() 设置输入流方式及字段
conf.setNumWorkers()设置worker数据。
经过多次试验总结,得出如下结论:
1)Topology的worker数通过config设置,即执行该topology的worker(java)进程数。它可以通过storm rebalance 命令任意调整。
- Topology中某个bolt的executor数,即parallelismNum,即执行该bolt的线程数,在setBolt时由第三个参数指定。它可以通过storm rebalance 命令调整,但最大不能超过该bolt的task数;
- bolt的task数,通过setNumTasks()设置。(也可不设置,默认取bolt的executor数),无法在运行时调整。
4)Bolt实例数,这个比较特别,它和task数相等。有多少个task就会new 多少个Bolt对象。而这些Bolt对象在运行时由Bolt的thread进行调度。
也即是说
builder.setBolt("cpp",newCppBolt(),3).setNumTasks(5).noneGrouping(pre_name);
会创建3个线程,但有内存中会5个CppBolt对象,三个线程调度5个对象。
一个运行中的拓扑的例子
下 面的图表展示了1个简单拓扑在实际操作中看起来是怎样的。这个拓扑包含了3个组件:
1个spout叫做BlueSpout,
2个bolt分别叫 GreenBolt和YellowBolt。BlueSpout发送它的输出到GreenBolt,GreenBolt又把它的输出发到 YellowBolt。
下面是对上图的简要分析:
3个组件的并发度加起来是10,就是说拓扑一共有10个executor,一共有2个worker,每个worker产生10 / 2 = 5条线程。
绿色的bolt配置成2个executor和4个task。为此每个executor为这个bolt运行2个task。
下面的代码配置了这3个组件,相关代码如下:
Config conf = new Config();
conf.setNumWorkers(2); // 使用2个worker进程
topologyBuilder.setSpout(“blue-spout”, new BlueSpout(), 2); // parallelism hint为2
topologyBuilder.setBolt(“green-bolt”, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(“blue-spout”);
topologyBuilder.setBolt(“yellow-bolt”, new YellowBolt(), 6) .shuffleGrouping(“green-bolt”);
StormSubmitter.submitTopology( “mytopology”, conf, topologyBuilder.createTopology() );
And of course Storm comes with additional configuration settings to control the parallelism of a topology, including:
此外还有其他的配置来控制拓扑的并发度,包括了:
TOPOLOGY_MAX_TASK_PARALLELISM: 这个设置指定了1个单独的组件的executor的数量的上限。当在测试阶段使用本地模式运行1个拓扑时,用来限制生成的线程的数量。你可以像下面这样来使用:
Config#setMaxTaskParallelism()
如何改变1个正在运行的拓扑的并发度
Storm有一个不错的特性,你可以在不需要重启集群或拓扑,来增加或减少worker进程和executor的数量。这样行为成为rebalancing。
你有两个选项可以rebalance1个拓扑:
- 使用Storm的web UI来rebalance。
- 像下面描述的那样,使用命令行工具来做:
# 重新配置拓扑 “mytopology” 使用5个worker进程。
# spout “blue-spout” 使用3个executor
# bolt “yellow-bolt” 使用10个executor
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10