原文链接Storm Tutorial
本人原创翻译,转载请注明出处
这个教程内容包含如何创建topologies及部署到Storm集群上。Java是主要使用语言,但有的例子使用了Python,主要是为了解释Storm的多语言能力。
前言
本教程使用的例子来自storm-starter project。建议clone该project并照着练习。阅读Setting up a development environment和 Creating a new Storm project以使你的计算机具备开始条件。
这两篇文章本人已翻译,请阅Storm(一)打造开发环境&创建一个Storm项目
Storm集群的组件
表面上看Storm集群和Hadoop集群有些像。在Hadoop上运行的是"MapReduce jobs",而在Storm上运行的是topologies。"Jobs" 和 "topologies"大不相同,有一个关键不同就是MapReduce job最终会停止,而topology永不停止(除非被用户kill掉)。
Storm集群有两类节点:master节点和worker节点。master节点上运行着一个守护程序 "Nimbus"(和Hadoop的 "JobTracker"有些像)。Nimbus负责在集群中散布code,给各个机器分配任务以及监控失败的情况。每个worker节点上也运行着一个守护程序"Supervisor"。Supervisor负责接收Nimbus分配的任务,按需启动和停止worker进程。每个worker进程执行了一个topology的子集。一个运行中的topology包含了多个worker,这些worker分布在多个机器上。
所有Nimbus和Supervisors的协调都通过Zookeeper集群进行。此外,Nimbus和Supervisors是立即失败和无状态的(fail-fast and stateless)。所有的状态都保存在Zookeeper或本地硬盘上。这意味着即使通过kill -9 杀死Nimbus和Supervisors,他们也会自动恢复,这个设计给了Storm集群难以置信的稳定性。
Topologies
要利用Storm来进行实时计算,就要创建Topologies。一个topology是一个计算的图,topology 中的每个节点包含处理逻辑,topology 中的每个边(link)指明了数据如何在节点中传递。运行一个topology很简单,首先打包code和依赖到jar文件,然后执行以下命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
这样就启动了org.apache.storm.MyTopology类,参数是arg1、arg2,这个类的主函数定义了 topology并提交到Nimbus。由于topology是Thrift structs,而且Nimbus是Thrift service,因此可以用任意的编程语言创建和提交topologies。这里举的是基于JVM语言的最简单的例子,更多信息请阅Running topologies on a production cluster
Streams
Storm 中最核心的抽象概念是"stream"。stream是元组(tuples)的无限序列,Storm提供了一种分布式、可靠的方式来将一个stream转化成一个新的stream。举个例子,你可以将一个tweets stream转化成一个trending topics stream。
Storm提供了"spouts" 和 "bolts"来完成stream的转化。通过实现Spouts和bolts的接口,你可以运行应用相关的逻辑。
spout是stream的来源,举个例子,spout可以从Kestrel队列中读取tuples并生成stream,或者spout也可以通过Twitter API生成一个tweets stream。
bolt可以消费任意数量的输入stream,做一些处理,很可能抛出新的stream。类似将tweets stream转化成trending topics stream这样的复杂转化,常常需要多个步骤,对应着多个bolt。Bolts可以做的事情很多,比如run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases等等。
spouts和bolts的网络被打包成了"topology",这是你提交到Storm集群执行的最高级别的抽象。 topology是一个stream转化的图,图的节点是spout或bolt,图的边指明了哪个bolt在订阅哪个stream。当一个spout 或 bolt发出一个tuple到一个stream,那么订阅了这个stream的所有bolt都会收到这个tuple。
Storm topology中的每个节点都是并行运行,在你的topology中,你可以指定各个节点的并行程度,Storm会按照你指定的数目在集群中启动相应数量的线程。
topology会永久执行下去,除非你停止它。Storm会自动重新分配失败的任务,此外,Storm保证数据不会丢失,即便机器宕机并且messages are dropped。
Data model
Storm的数据模型是tuple。tuple是一个命名list,字段可以是任意类型的对象。Storm支持所有primitive types, strings, and byte arrays作为tuple的字段值。如果要使用其他类型的对象,只需要实现serializer。
topology的每个节点都需要定义输出的tuple字段。如下的例子中,bolt定义了两个带有 "double"和"triple"字段的tuple。
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
一个简单的topology
看一下storm-starter中ExclamationTopology的定义:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
这个Topology包含一个spout和两个bolt,spout发出words,bolt在输入的string后面加上"!!!"。节点被组织成一条直线:spout发出到第一个bolt,然后到第二个bolt。如果spout发出了tuples ["bob"]和["john"],那么第二个bolt会发出 ["bob!!!!!!"]和["john!!!!!!"]。
setSpout 和 setBolt方法定义了节点,第一个参数是用户定义的ID,第二个参数是处理逻辑对象,第三个参数是并发线程数。
spout的处理逻辑对象实现了IRichSpout接口,bolt的处理逻辑对象实现了IRichBolt接口。最后一个参数是可选的,如果不指定,Storm只分配一个线程。
setBolt返回一个InputDeclarer对象,这个对象定义了bolt的输入。这里组件exclaim1声明了它要读取所有组件words发出的tuples。组件exclaim2声明了它要读取所有组件exclaim1发出的tuples。"shuffle grouping"会被随机的分配到bolt。组件之间有多种分组数据的方式,如果想要组件exclaim2同时读取组件words和exclaim1的tuples,可以像这样定义:
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
接下来看看这个topology的spout和bolt的实现。spout负责发出数据到topology。TestWordSpout每隔100ms从["nathan", "mike", "jackson", "golda", "bertels"]中发出随机word作为一个tuple,TestWordSpout的nextTuple()实现如下:
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
ExclamationBolt的实现如下:
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
prepare方法给bolt提供了一个OutputCollector,用来从这个bolt发出tuple。Tuples可以在任何时候发出——包括prepare, execute, 或cleanup方法,甚至在另一个线程中异步发出。这里prepare方法只是保存OutputCollector为实例变量,后面execute方法会用到。
execute方法接收一个tuple,ExclamationBolt抓取tuple的第一个字段并在后面加上"!!!"。如果bolt订阅了多个输入源,可以通过Tuple#getSourceComponent方法查询tuple来源。
输入tuple被作为了emit的第一个参数,最后一行ack了输入tuple。这些是Storm可靠性API的一部分,用以保证没有数据丢失,本教程后续会进一步介绍。
cleanup方法在bolt停止的时候调用,在这里应该关闭所有打开的资源。Storm集群不保证一定会调用这个方法:例如,if the machine the task is running on blows up, there's no way to invoke the method. cleanup方法适用于local模式,你可以运行和停止许多topologies,不必担心资源泄露。
declareOutputFields方法声明了ExclamationBolt发出带一个名为"word"字段的1-tuples。
getComponentConfiguration方法允许你配置组件运行的参数。这是一个更高级的主题Configuration
在一个bolt的实现中,像cleanup 和 getComponentConfiguration这样的方法常常不需要。你可以通过继承提供了默认实现的基类来更简洁的实现bolt。例如:
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
在local模式下运行ExclamationTopology
在local模式中,Storm完全在一个进程内运行,worker通过线程模拟,主要用于测试和开发场景。当你运行storm-starte中的topologies,他们将在local模式下运行,你能够看到每个组件正在发出的消息。
更多local模式信息请阅Local mode
更多分布式模式信息请阅running topologies in local mode on Local mode
如下是local模式下运行ExclamationTopology的代码:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
首先,通过创建LocalCluster对象定义了一个进程内的集群。提交topologies到这个虚拟集群和提交到分布式集群是完全一样的操作。submitTopology方法的第一个参数是topology的名字,第二个参数是topology的配置,第三个参数是topology对象。
这里topology的配置很常见:
TOPOLOGY_WORKERS (由setNumWorkers设置) 指定了你想分配多少进程来执行这个topology。 topology中的每个组件将以线程的形式执行,线程的数量由setBolt 和 setSpout配置。
TOPOLOGY_DEBUG (由setDebug设置) 当设置为true时, Storm记录组件发出的每个消息。
更多配置信息请阅the Javadoc for Config.
Stream groupings
stream grouping用于描述组件之间如何发送tuple。在集群中,spouts 和 bolts总是并行执行任务,在执行任务层面上,一个topology看起来如下图所示:
当Bolt A的一个任务发出tuple到Bolt B时,应该发到Bolt B的哪个任务呢?
stream groupings就是用来解决这个问题。在深入了解不同种类的stream groupings之前,我们先看看storm-starter中的另一个topology。WordCountTopology从spout中读取句子,WordCountBolt统计单词出现的次数。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
SplitSentence发出句子中的单词作为tuple,WordCount以map的形式存储单词出现的次数,每次WordCount收到单词,就更新map并发出新的单词数目。
最简单的grouping方式是"shuffle grouping",也就是随机发送tuple给任务。"fields grouping"是一个更有趣的grouping方式,这里用在了SplitSentence bolt 和 WordCount bolt之间。对WordCount bolt来说,相同的单词应该发送到相同的任务,否则多个任务都会收到同样的单词,由于每个任务的信息都不完全,他们可能会发出错误的单词数目。fields grouping用字段的子集来分组,字段值相同的tuple被发送到相同的任务。
Fields groupings是实现streaming joins 和 streaming aggregations,它的底层实现利用了mod hashing。还有一些其他的分组方式,请阅Concepts。
使用其他编程语言定义Bolts
Bolts可以使用任意语言定义,用JVM-based语言以外定义的Bolts以子进程的方式运行,Storm以stdin/stdout之上的JSON格式消息与子进程通信。通信协议用到了一个100行左右的适配器库,支持Ruby, Python, Fancy。
WordCountTopology的SplitSentence定义如下:
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
SplitSentence重写了ShellBolt,声明它使用python,参数是splitsentence.py。splitsentence.py实现如下:
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()
更多多语言信息,请阅Using non-JVM languages with Storm
确保消息被处理
这部分属于Storm's reliability API的内容:Storm如何保证spout发出的消息都能被处理,请阅Guaranteeing Message Processing
Transactional topologies
Storm保证每个消息都至少被处理一次。一个常见的问题是:Storm会不会overcount?Storm提供了一种机制,确保消息只被传递一次。transactional topologies。
Distributed RPC
除了本教程已经展示的功能,Storm还可以做许多事。其中最有趣的应用之一是Distributed RPC。