1 Storm概述
Storm 是一个实时的、分布式的、可靠的流式数据处理系统。它的工作就是委派各种组件分别
独立的处理一些简单任务。在 Storm 集群中处理输入流的是 Spout 组件,而 Spout 又把读取的
数据传递给叫 Bolt 的组件。Bolt组件会对收到的数据元组进行处理,也有可能传递给下一个 Bolt。
我们可以把 Storm 集群想象成一个由 Bolt 组件组成的链条集合,数据在这些链条上传输,而 Bolt
作为链条上的节点来对数据进行处理。
Storm 保证每个消息都会得到处理,而且处理速度非常快, 在一个小集群中,每秒可以处理数
以百万计的消息。 Storm 的处理速度非常惊人:经测试,每个节点每秒可以处理 100 万个数据
元组。其主要应用领域有实时分析、在线机器学习、持续计算、分布式 RPC(远过程调用协议,
一种通过网络从远程计算机程序上请求服务, 而不需要了解底层网络技术的协议。)、 ETL(数据
抽取、转换和加载)等。
Storm 和 Hadoop 集群表面看上去很类似,但是 Hadoop 上面运行的是 MapReduce Jobs,而在
Storm 上运行的是拓扑 Topology,这两者之间是非常不一样的,关键区别是: MapReduce 最终
会结束,而一个 Topology 永远会运行(除非你手动 kill 掉), 换句话说, Storm 是面向实时数据
分析,而 Hadoop 面向的是离线数据分析。
2 应用场景
1. 信息流处理
Storm 可用来实时处理新数据和更新数据库,兼具容错性和可扩展性。即 Storm 可以用来处理源源
不断流进来的消息,处理之后将结果写入到某个存储中去。
2. 连续计算
Storm 可进行连续查询并把结果即时反馈给客户端。比如把 Twitter 上的热门话题发送到浏览器中。
3. 分布式远程调用
Storm 可用来并行处理密集查询。 Storm 的拓扑结构是一个等待调用信息的分布函数,当它收到一
条调用信息后,会对查询进行计算,并返回查询结果。举个例子 Distributed RPC 可以做并行搜索
或者处理大集合的数据。
3 Storm特点
1. 编程模型简单
在大数据处理方面, Hadoop 为开发者提供了 MapReduce 原语,使并行批处理程序变得非常简单
和优美。同样, Storm 也为大数据的实时计算提供了一些简单优美的原语,这大大降低了开发并行
实时处理任务的复杂性,可以快速、高效的开发应用。
2. 可扩展
在 Storm 集群中真正运行 Topology 的主要有 3 个实体:工作进程、线程和任务。 Storm 集群中的
每台机器都可以运行多个工作进程,每个工作进程又可创建多个线程,每个线程可以执行多个任务,
任务是真正进行数据处理的实体,开发的 Spout、 Bolt 就是作为一个或者多个任务的方式执行的。
因此,计算任务在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展。
3. 高可靠性
Spout 发出消息后, 可能会触发产生成千上万条消息,可以形象地理解为一颗消息树,其中 Spout
发出的消息为树根, Storm 会跟踪这颗消息树的处理情况,只有当这颗消息树中的所有消息都被处
理了, Storm 才会认为 Spout 发出的这个消息已经被“完全处理”。如果这颗消息树种的任何一个
消息处理失败了,或者整棵消息树在限定时间内没有“完全处理”,那么 Spout 发出的消息就会重
发。
4. 高容错性
如果在消息处理过程中出了一些异常, Storm 会重新安排这个出问题的处理单元。 Storm 保证一个
处理单元永远运行(除非显式杀掉该处理单元)。
5. 支持多种编程语言
除了用 Java 实现 Spout和 Bolt,还可以使用其他编程语言来完成这项工作,这一切得益于 Storm 的
多语言协议。多语言协议是 Storm 内部的一种特殊协议,允许 Spout或 Bolt 使用标准输入和标准输
出来传递消息,传递的消息为单行文本或多行 JSON编码的格式。
6. 支持本地模式
Storm 有一种“本地模式”,也就是在进程中模拟一个 Storm 集群的所有功能,以本地模式运行
Topology 与在集群上运行 Topology 类似,这对于开发和测试来说非常有用。
7. 高效
用 ZeroMQ 作为底层消息队列,保证消息能被快速处理。
4 集群架构与核心概念
Storm 的集群由一个主节点和多个工作节点组成。主节点运行一个名为“Nimbus”的守护进程,每个
工作节点都运行一个名为“Supervisor”的守护进程,两者的协调工作由 ZooKeeper 来完成,
ZooKeeper 用于管理集群中的不同组件。
4.1 主节点 Nimbus
主节点通常运行一个后台程序——Nimbus,用于响应分布在集群中的节点,分配任务和监测故障,
在某个节点的 Supervisor 出现故障宕机之后,如果在该节点上运行的 Worker 进程异常终止, Nimbus
会将异常终止的 Worker 进程分配到其他 Supervisor 节点上继续运行, 这类似于 Hadoop 中的
JobTracker。
4.2 工作节点 Supervisor
每一个工作节点上面运行一个叫做 Supervisor 进程。 Supervisor 负责监听从 Nimbus 分配给它执行的
任务, 还能保证正常运行的 Worker 异常终止之后能够重启该 Worker。 Nimbus 和 Supervisor 之间的
协调则通过 ZooKeeper 系统。
4.3 协调服务组件 Zookeeper
ZooKeeper 是完成 Nimbus 和 Supervisor 之间协调的服务。而应用程序实现实时的逻辑则被封装进
Storm 中的“topology”。 Topology 则是一组由 Spout(数据源)和 Bolts(数据处理)通过 Stream
Groupings 进行连接的图。
4.4 元组 Tuple
一次消息传递的基本单元。本来应该是一个 key-value 的 map,但是由于各个组件间传递的 tuple 的
字段名称已经事先定义好,所以 tuple 中只要按序填入各个 value 就行了,所以就是一个值列表。
4.5 工作进程 Worker
Worker 是一个 Java 进程,执行拓扑的一部分任务。一个 Worker 进程执行一个 Topology的子集,它
会启动一个或多个 Executor 线程来执行一个 Topology 的组件(Spout 或 Bolt),如下图
图1-4 Worker 结构图
4.6 线程 Task
Task 是运行 Spout 和 Bolt 的单元,每一个 Spout/Bolt 的线程称为一个 Task。在 Storm0.8 之后的版
本中, Task 不再与物理线程对应,同一个 Spout/Bolt 的 Task 可能会共享一个物理线程,该线程称为
Executor。
4.7 消息源 Spout
消息源 Spout 是 Storm 的 Topology中的消息生产者(即 Tuple 的创建者)。 Spout 向 Topology 中发
出的 Tuple 可以是可靠地,也可以是不可靠的。当 Storm 接收失败时,可靠地 Spout 会对 Tuple 进行
重发。而不可靠的 Spout 不会考虑成功是否只发射一次。消息源可以发射多条消息流 Stream,使用
OutputFieldsDeclarer.declareStream 来定义多个 Stream,然后使用 SpoutOutputCollector 来发射指1-4
定的 Stream。 Spout 中最主要的方法就是 nextTuple(),该方法会发射一个新的 Tuple 到 Topology,
如果没有新 Tuple发射则会简单地返回。要注意的是 nextTuple 方法不能阻塞,因为 Storm 在同一个
线程上面调用所有消息源 Spout 方法。另外两个比较重要的 Spout 方法是 ack 和 fail。 Storm 在检测
到一个 Tuple 被整个 Topology成功处理的时候调用 ack,否则调用 fail。 Storm 只对可靠的 Spout 调
用 ack 和 fail。
4.8 数据处理 Bolt
Bolt 是接收 Spout 发出元组 Tuple 后处理数据的组件,所有的消息处理逻辑被封装在 Bolt 中, Bolt
负责处理输入的数据流并产生输出的新数据流。 Bolt 可以执行过滤、函数操作、合并、写入数据库等
操作。 Bolt 还可以简单地传递消息流,复杂的消息流处理往往需要很多步骤,因此也就需要很多 Bolt
来处理。 Bolts 也可以发射多条小溪流,使用 OutputFieldsDeclare.declareStream 定义 Stream,使用
OutputCollector.emit 来选择要发射的 stream。而 Bolt 中最重要的方法是 execute(),以一个 Tuple
作为输入。 Bolt 使用 OutputCollector 来发射 Tuple。对 Tuple 的处理都可以放到此方法中进行,具体
的发送也是通过 emit 方法来完成的。
4.9 拓扑 Topology
Storm 的 Topology 指的是类似于网络拓扑图的一种虚拟结构。 Topology 类似于 MapReduce 任务,
一个关键的区别是 MapReduce 任务运行一段时间后最终会完成,而 Topology会一直运行,直到你手1-5
动 kill掉。一个 Topology是 Spouts和 Bolts组成的图,通过 Stream Groupings将图中的 Spouts和 Bolts
连接起来。
运行一个 Topology 很简单,首先把你所有的代码以及依赖的 jar 打进一个 jar 包,然后运行类似下面
这个命令:
Storm jar myCode.jar com.h3c.main.MyTopology arg1 arg2
这个命令会运行主类 com.h3c.main.MyTopology, 数是 arg1 跟 arg2。这个类的 main 函数定义这个
Topology 并且把他提交给 Nimbus。 Storm jar 负责连接 Nimbus 并且上传 jar 包。
Topology 的定义是一个 Thrift 结构,并且 Nimbus 就是一个 Thrift 服务,你可以提交任务语言创建的
Topology。提交 Topology 后,可以在 Storm UI界面查看整个 Topology 运行过程。
4.11 消息流 Stream
一个没有边界的、源源不断的、连续的 Tuple 序列就组成了 Stream,如下图。
图1-8 Stream 组织结构图
4.12 Stream Grouping
用来定义一个流如何分配 Tuple 到 Bolt。 Storm 包括 6 种流分组类型:
1. 随机分组(Shuffle grouping):随机分发 tuple到Bolt的任务,保证每个任务获得相等数量的tuple。
2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,
相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。1-6
3. 全部分组(All grouping): tuple 被复制到 Bolt 的所有任务。这种类型需要谨慎使用。
4. 全局分组(Global grouping):全部流都分配到 Bolt 的同一个任务。明确地说,是分配给 ID 最小
的那个 Task。
5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,
Storm 将把无分组的 Bolts 放到 Bolts 或 Spouts 订阅它们的同一线程去执行(如果可能)。
6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定 tuple由哪个元组处理
者任务接收。
Storm 还支持通过实现 CustomStreamGroupimg 接口来定制自己需要的分组。
4.13 Ack 机制
Storm 可以保证从 Spout 发出的每个数据都被完全处理,从 Spout 发出的数据可能会产生成千
上万的数据。一个 Tuple 被完全处理指:这个 Tuple 以及这个 Tuple 产生的所有 Tuple 都被成功
处理。而一个 Tuple 被认为处理失败是被是指在 timeout 时间内没有被成功处理(包括显示的 fail
和超时导致的失败)。这个 timeout 时间可以通过配置项 TOPOLOGY_MESSAGE_TIMEOUT_SECS
来设定, Timeout 的默认时长为 30 秒。
Storm 之所以有这样的能力是因为有个特殊的 Task 即 Acker ,他们负责跟踪 Spout 发出的每一
个 Tuple 的 Tuple 树。当 Acker 发现一个 Tuple 树已经处理完成了,它会发送一个消息给产生这
个 Tuple 的那个 Task。 Acker 的跟踪算法是 Storm 的主要突破之一,对任意大的一个 Tuple 树,
它只需要恒定的 20 字节就可以进行跟踪。
Acker 跟踪算法的原理: Acker 对于每个 Spout-tuple 保存一个 ack-val 的校验值,它的初始值是
0,然后每发射一个 Tuple 或 Ack 一个 Tuple 时,这个 Tuple 的 id 就要跟这个校验值异或一下,
并且把得到的值更新为 ack-val 的新值。那么假设每个发射出去的 Tuple 都被 ack 了,那么最后
ack-val 的值就一定是 0。 Acker 就根据 ack-val 是否为 0 来判断是否完全处理,如果为 0 则认为
已完全处理。