Storm 是什么
Apache Storm is a free and open source distributed realtime computation system.
Apache Storm makes it easy to reliably process unbounded streams of data
Apache Storm is simple, can be used with any programming language
- 免费和开源的实时计算系统
- 易无边界实时处理数据
- 支持多种语言
Storm 能做什么
online machine learning, continuous computation, distributed RPC, ETL, and more
- Storm能实现高频数据和大规模数据的实时处理
Storm的发展历史
Storm诞生于BackType(被Twitter收购)公司。
Storm技术网站
storm.apache.org
github.com/apache/storm
en.wikipedia.org/wiki/Storm_(event_processor)
Storm vs Hadoop
- 数据源/处理领域
- storm处理实时数据,延迟性低
- Hadoop处理离线数据,时间长
- 处理过程
- storm: "topologies"
- Hadoop: "MapReduce jobs"
- 进程是否结束
- Storm运行不能停止,除非kill进程
- Hadoop完成一个一个的job,会结束
- 处理速度
Storm vs Spark Stream
- Spark Stream
- 不是真正的实时处理,其实是将输入的数据分
为多个小批次进行批处理。 - Spark提供一站式处理,离线,实时,机器学习
- 不是真正的实时处理,其实是将输入的数据分
- Storm
- 进行的真正的实时处理。
- 单纯为实时计算而生中
Storm 核心概念
-
Stream
- 流,由数据组成的数据流
- 消息流,抽象概念,无边界的tuple序列构成。
- Streams 可以用分布式可靠的方式将现有的stream转换为新的steam。
- 为stream提供转换的基本单元是
Spouts
,Bolts
。
-
Spout
- 产生数据的源头,从其他数据源读取数据,负责输入新的数据到
Topolgies
。 - 消息流的源头,Topology的消息生产者
- 产生数据的源头,从其他数据源读取数据,负责输入新的数据到
-
Bolt
- 处理数据,可能会产生新的流
- 消息处理单元,可以做数据的过滤、聚合、查询/写数据库的操作
-
Tuple
- Storm的数据模型,是一个已命名的值序列,其中的字段可以是任何类型的对象。
- Storm支持所有基本类型、字符串和字节数组作为Tuple的字段值,还包括是实现了序列化的对象类型。
- 消息/数据 传递的基本单位
- Topolgies中的每个节点都必须声明发出的Tuple的输出字段类型。
-
Topology
- 拓扑,由spout和bolt组成,是提交给Strom的顶级抽象体。
- Topolgies中的每个节点的执行都是并行的。且Topolgies的执行是永不停止的,除非被kill。
- Storm会自动重新分配任何失败的任务,而且,Storm不会有任何数据的丢失,即使机器宕机,信息被丢弃。主要是其所有状态都保存在Zookeeper或本地磁盘上。
- 拓扑结构的每个节点都包含计算逻辑,两个节点之间的连接指示数据应该怎样被传递。
-
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
all-my-code.jar
是项目打的jar包,其中得有全部的依赖,
org.apache.storm.MyTopology
是要提交的Topolgies。
Storm 集群的组成
- Storm 集群中有两种节点,分别为
master node
(主节点)
和worker node
(工作节点)。 -
master node
会运行一个后台线程,叫做Nimbus
,这和Hadoop的JobTracker
很相似。Nimbus
负责在集群中分发代码、分配任务和监视故障。 - 每个
worker node
都会运行后台线程Supervisor
, 意味着该后台线程是一个管理者的角色,但是他管理的是Nimbus
分配在它所在的机器上的worker processes
。 -
worker processes
执行的是topology
的一个子集,一个正在运行的Topology
包含着多个分布在多个机器的worker processes
-
Nimbus
和Supervisor
之间的所有协调都是交给一个Zookeeper
集群来做的。 -
Nimbus
和Supervisor
后台线程都是实现了fail-fast
和stateless
。所有状态都保存在Zookeeper或本地磁盘上。这意味着你可以杀死Nimbus
和Supervisor
,他们会像什么都没发生一样重新开始。这种设计使得Storm cluster
非常稳定。-
fail-fast
是指在可能发生故障的系统中,先报道表明可能发生的错误,并且停止运行,而不会让系统冒着风险去运行。 -
stateless
是指无状态,不能保存数据,例如没有实例对象的对象,不能保存对象,是线程安全的。
-
Storm的Java API
一个简单的Topology实例
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
new TopologyBuilder()
得到build实例对象,通过该对象来定义新节点。
setSpout
, setBolt
两个方法分别是声明Spout和Bolt, 第一个参数是自定义的id,相当于节点的名字,应该是唯一的。
第二个参数是实现了固定接口的具体实体。第三个参数是指定了节点执行任务的并行度,默认为单线程。
shuffleGrouping
意味着,输入的Tuple随机的输入到任务中,类似混淆。
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
一个Bolt节点也可以从多个节点中获取数据,可以链接声明。
IComponent接口
Spout和Bolt都要实现的接口。
declareOutputFields
方法声明输出Stream的规则。
ISpout
ISpout
是Spout
的核心接口,当strom发现每个在DAG(有向无环图,即Topolgies
)的Tuple
都被成功处理后,就会向Spout发送ack
信息。
如果有一个Tuple
在Topolgies
规定的时间内未被完全处理,Storm将会发送fail
信息。- 当
Spout
发送一个tuple
,它会给tuple
标记一个message id
,当storm
发送ack
或者fail
信息时,spout
会通过这个id来标识是哪个tuple
。如果是没有标识的id的tuple,Storm将不会对它进行跟踪。- Storm执行
ack
、fail
、nextTuple
都在同一个线程中,这说明其保证了线程安全。因此必须保证nextTuple
是非阻塞的,否则会阻塞ack
和fail
。
open(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector)
当集群内的worker初始化Spout时调用该方法,它为该Spout提供了执行需要的环境。
close
停止Spout时将会被调用,但是通常情况下都不会调用该方法,因为集群通常使用kill -9
停止任务,本地模式下通常才调用该方法。
activate
Spout要进行工作时,即要执行nextTuple前,执行该方法将Spout激活。
deactivate
Spout在接下来时间里不会有工作执行的情况下,执行该方法休眠。
nextTuple
Storm请求Spout向输出收集器发送Tuple时调用该方法,该方法必须是非阻塞的,即便没有数据发出,这个方法也要返回。
ack(Object msgId)
Tuple发送成功,该方法接收Tuple的message id
。
fail(Object msgId)
同ack
,接收发送失败的Tuple的message id
。
IBolt
- IBolt消费Tuple后生产新的Tuple发送出去。但是它并不会在接收到一个Tuple后立刻就处理它。
- IBolt被客户端机器创建,然后通过Java序列化后放入
Topology
里,再被提交到Nimbus
。随后Nimbus
发起worker后,会将该IBolt反序列化(调用其中的prepare
),然后它才会开始处理Tuple
。
prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector)
它为该Bolt提供了执行需要的环境。
execute(Tuple input)
对Tuple进行处理。Tuple中包含着它来自哪个组件,可以通过Tuple#getValue方法来得到其中的value。而且,如上所说,IBolt并不会立刻处理数据。
cleanup
和ISpout的close类似,不一定被执行,该方法的功能是在IBolt被停止时清理数据。