这是一个JStorm使用教程,不包含环境搭建教程,直接在公司现有集群上跑任务,关于JStorm集群环境搭建,后续研究完会考虑额外写一篇博客。
你如果想了解JStorm是什么,有多牛逼什么什么的,请看最后的参考博客链接,里面有各种版本的介绍,我就不在这里总结这种东西了,我相信这些东西你第一次接触的时候会看,等学了JStorm之后也不会再去看这些东西了。。。
简介
-
JStorm和MapReduce的一些对比
一些关键概念
- nimbus:主控节点运行Nimbus守护进程,类似于Hadoop中的ResourceManager,负责在集群中分发代码,对节点分配任务,并监视主机故障。
- supervisor:每个工作节点运行Supervisor守护进程,负责监听工作节点上已经分配的主机作业,启动和停止Nimbus已经分配的工作进程,类似于Hadoop中的NodeManager。
supervisor会定时从zookeeper获取拓补信息topologies、任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。
在supervisor同步时,会根据新的任务分配情况来启动新的worker或者关闭旧的worker并进行负载均衡。 - worker:Worker是具体处理Spout/Bolt逻辑的进程,根据提交的拓扑中conf.setNumWorkers(3);定义分配每个拓扑对应的worker数量,Storm会在每个Worker上均匀分配任务,一个Worker只能执行一个topology,但是可以执行其中的多个任务线程。
- task:任务是指Worker中每个Spout/Bolt线程,每个Spout和Bolt在集群中会执行许多任务,每个任务对应一个线程执行,可以通过TopologyBuilder类的setSpout()和setBolt()方法来设置每个Spout或者Bolt的并行度。
- Executor:Task接收到任务就是在Executor中执行的,可以理解为执行Task专门的一个线程。
- topology:Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排、容纳一组计算逻辑组件(Spout、Bolt)的对象(Hadoop MapReduce中一个Job包含一组Map Task、Reduce Task),这一组计算组件可以按照DAG图的方式编排起来(通过选择Stream Groupings来控制数据流分发流向),从而组合成一个计算逻辑更加负责的对象,那就是Topology。一个Topology运行以后就不能停止,它会无限地运行下去,除非手动干预(显式执行bin/storm kill )或意外故障(如停机、整个Storm集群挂掉)让它终止。
- spout:Storm中Spout是一个Topology的消息生产的源头,Spout应该是一个持续不断生产消息的组件,例如,它可以是一个Socket Server在监听外部Client连接并发送消息,可以是一个消息队列(MQ)的消费者、可以是用来接收Flume Agent的Sink所发送消息的服务,等等。Spout生产的消息在Storm中被抽象为Tuple,在整个Topology的多个计算组件之间都是根据需要抽象构建的Tuple消息来进行连接,从而形成流。
- bolt:Storm中消息的处理逻辑被封装到Bolt组件中,任何处理逻辑都可以在Bolt里面执行,处理过程和普通计算应用程序没什么区别,只是需要根据Storm的计算语义来合理设置一下组件之间消息流的声明、分发、连接即可。Bolt可以接收来自一个或多个Spout的Tuple消息,也可以来自多个其它Bolt的Tuple消息,也可能是Spout和其它Bolt组合发送的Tuple消息。
- tuple:JStorm中信息传输的单位,Storm程序是无限执行下去的,数据流是无止境的,但是每次驱动程序执行的只是一个数据流单位,就是Tuple,Spout的一次nextTuple以及Bolt的一次execute的执行操作的都是一个Tuple。Tuple只要是任意可序列化对象即可。
生命周期
Topology生命周期
- 上传代码并做校验(/nimbus/inbox);
- 建立本地目录(/stormdist/topology-id/);
- 建立zookeeper上的心跳目录;
- 计算topology的工作量(parallelism hint),分配task-id并写入zookeeper;
- 把task分配给supervisor执行;
- 在supervisor中定时检查是否有新的task,下载新代码、删除老代码,剩下的工作交个小弟worker;
- 在worker中把task拿到,看里面有哪些spout/Bolt,然后计算需要给哪些task发消息并建立连接;
-
在nimbus将topology终止的时候会将zookeeper上的相关信息删除;
Spout生命周期
提交时
- 构造方法:初始化构造参数,其中包含的必须都是可序列化的
- getComponentConfiguration:获取该类特殊的配置参数,只和该组件相关的配置,通常
return null
- declareOutputFields:获取该组件会输出的流、字段列表,其后续的其他组件订阅相应的流或者字段需要和这里对应,否则会出错
- 将内存中的该实例序列化为字节码文件。
在Worker节点中执行
- 将传输过来的字节码文件反序列化为类实例
- open:初始化这个组件类实例,可以加载消息队列消费端、JDBC链接池等非可序列化对象
- activate:该实例设置为活跃状态(有数据流驱动时)调用,过段时间暂时没有数据流驱动就会睡眠
- nextTuple:循环调用,可在这里从数据源获取数据emit到下一个节点,JStorm就会自动循环调用执行下去
- ack:往后emit的一个Tuple在acker节点察觉成功了,回调通知Spout
- fail:往后emit的一个Tuple在acker节点察觉失败或者超时了,回调通知Spout
- deactivate:没数据流驱动达到一段时间,进入睡眠前调用
- close:程序停止时调用,释放资源
Bolt生命周期
提交时
- 构造方法:初始化构造参数,其中包含的必须都是可序列化的
- getComponentConfiguration:获取该类特殊的配置参数,只和该组件相关的配置,通常
return null
- declareOutputFields:获取该组件会输出的流、字段列表,其后续的其他组件订阅相应的流或者字段需要和这里对应,否则会出错
- 将内存中的该实例序列化为字节码文件。
在Worker节点中执行
- 将传输过来的字节码文件反序列化为类实例
- prepare:初始化这个组件类实例,可以加载配置,数据处理类初始化,数据输出对象初始化
- execute:循环调用,可在这里从上个节点获取Tuple,进行相应处理之后emit到下一个节点,JStorm就会自动循环调用执行下去
- cleanup:程序停止时调用,释放资源
数据流向控制
- ShuffleGrouping:对符合条件的目标Worker,其中可能的多个Task,随机分配Task来接收和处理该Tuple
- FieldsGrouping:会按照指定的field值进行分配,可以保证相同field对应值的Tuple分配到相同一个Task中执行 —— 可以想象成拿指定的field的值hash取模决定哪个Task(具体算法没研究)
- 除了这两个其他暂时没用到,也感觉剩下的比较用不到,等用了再更
- 具体的流的聚合和分发,参考这篇博客,例子很详细:JStorm流的汇聚和分发
数据流传输过程
- Spout中的数据源取出一份数据(无限循环取出),作为一个Tuple,emit到下一个节点
- 根据Spout中declareOutputFields定义的字段和流,查阅后继订阅该节点或者流的Bolt,Tuple会被发送到每个订阅节点或者流的后继节点当中
- 后继订阅的Bolt节点接收到该Tuple,使用
tuple.getValueByField
通过上一节点declareOutputFields的字段名获取相应的字段值,也可以根据fields的声明顺序使用tuple.getValue
通过下标获取相应的值 - 拿到相应的数据之后,进行相关逻辑处理,之后emit到下一个节点当中,以此类推,直到最终节点将数据输出到mysql、ES、HDFS等存储系统当中
- emit时可以指定相应的streamId来指定当前的数据要传输到的哪个streamId当中(该组件的declareOutputFields需要声明所需的所有streamId),在Topology构建时后继节点指定该streamId来订阅相应的数据。
编程例子讲解
- 这个例子是一个单词计数程序,通过一组字符串数组中随机获取一个档次作为数据源往后输出,在后续节点统计各个单词被获取的总次数。
-
包括RandomSentenceSpout、SplitBolt、CountBolt三个节点,各个节点并行度都为1,是一个最简单的单条链式的拓扑,如下
RandomSentenceSpout
- 表示数据源,这里用从数组随机获取一个元素作为模拟数据源获取,日常开发通常是从MQ中获取相应数据进行数据流驱动。
/**
* RandomSentenceSpout实现了IRichSpout接口
* Spout需要实现的接口可以是:
* 1,IRichSpout:最基本的Spout,继承自ISpout, IComponent,沒有任何特殊方法(一般用这个)
* 2,IControlSpout:继承自IComponent,包括open,close,activate,deactivate,nextTuple,ack(Object msgId),fail等方法
*/
public class RandomSentenceSpout implements IRichSpout {
private static final long serialVersionUID = 4058847280819269954L;
private static final Logger logger = Logger.getLogger(RandomSentenceSpout.class);
//可以理解为JStorm的数据传输管道,通过这个对象将这个组件的数据传输到下一个组件当中
private SpoutOutputCollector _collector;
//随机生成对象
private Random _rand;
private String component;
/**
* Spout初始化的时候调用
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
component = context.getThisComponentId();
}
/**
* 系统框架会不断调用
*/
public void nextTuple() {
//模拟数据源
String[] sentences = new String[]{"Hello world! This is my first programme of JStorm",
"Hello JStorm,Nice to meet you!", "Hi JStorm, do you have a really good proformance",
"Goodbye JStorm,see you tomorrow"};
//随机取出字符串
String sentence = sentences[_rand.nextInt(sentences.length)];
//将得到的字符串输出到下一个组件
//!!!这里Values中值填充顺序要和下面declareOutputFields中字段声明顺序一致
_collector.emit(new Values(sentence), Time.currentTimeSecs());
Utils.sleep(1000);
}
@Override
public void ack(Object arg0) {
logger.debug("ACK!");
}
public void activate() {
logger.debug("ACTIVE!");
}
public void close() {
}
public void deactivate() {
}
public void fail(Object arg0) {
logger.debug("FAILED!");
}
/**
* 声明框架有哪些输出的字段
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//下一个组件通过word这个关键字拿到这个组件往后输出的单词sentence
declarer.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
SplitBolt
- 将获取的字符串通过空白符分割,并转为小写之后输出到下一个节点。
/**
* IBasicBolt:继承自IComponent,包括prepare,execut,cleanup等方法
*/
public class SplitBolt extends BaseBasicBolt {
private static final long serialVersionUID = 7104767103420386784L;
private static final Logger logger = Logger.getLogger(SplitBolt.class);
private String component;
/**
* cleanup方法在bolt被关闭的时候调用, 它应该清理所有被打开的资源。(基本只能在local mode使用)
* 但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。
* cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群),
* 并且你想在关闭一些topology的时候避免资源泄漏。
* (非 Javadoc)
* @see backtype.storm.topology.base.BaseBasicBolt#cleanup()
*/
@Override
public void cleanup() {
}
//接收消息之后被调用的方法
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
//以下两个方式获取前驱节点发送过来的sentence,一个根据fieldName,一个根据字段声明顺序
// String sentence = input.getValueByField("word");
String sentence = input.getString(0);
String[] words = sentence.split("[,|\\s+]");
for (String word : words) {
word = word.trim();
//将非空单词输出到下一个节点
if (!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}
/**
* prepare方法在worker初始化task的时候调用.
*
* prepare方法提供给bolt一个Outputcollector用来发射tuple。
* Bolt可以在任何时候发射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一个线程里面异步发射。
* 这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法 使用。
*/
@Override
public void prepare(Map stromConf, TopologyContext context) {
component = context.getThisComponentId();
}
/**
* declearOutputFields方法仅在有新的topology提交到服务器,
* 用来决定输出内容流的格式(相当于定义spout/bolt之间传输stream的name:value格式),
* 在topology执行的过程中并不会被调用.
* (非 Javadoc)
* @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
CountBolt
- 这个组件接收到的每个值都是单个单词,通过一个内存Map统计各个单词总数
- 后台设置一个异步线程10s一次输出当前Map中的各个单词总数
- 日常开发通常在这个终端节点将实时计算得到的结果输出到HDFS、mysql、HBase、ElasticSearch等存储系统当中
public class CountBolt extends BaseBasicBolt {
private Integer id;
private String name;
private Map<String, Integer> counters;
private String component;
private static final Logger LOG = Logger.getLogger(CountBolt.class);
//异步输出结果集的子线程
private AsyncLoopThread statThread;
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap<String, Integer>();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
//异步循环输出结果集
this.statThread = new AsyncLoopThread(new statRunnable());
LOG.info(stormConf.get("abc") + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
component = context.getThisComponentId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
// declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum"));
// LOG.info("set stream coord-"+component);
}
//接收消息之后被调用的方法
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// String str = input.getString(0);
String str = input.getStringByField("word");
if (!counters.containsKey(str)) {
//单词计数
counters.put(str, 1);
} else {
//单词计数
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
}
/**
* 异步输出结果集的死循环子线程
*/
class statRunnable extends RunnableCallback {
@Override
public void run() {
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
LOG.info("\n-- Word Counter [" + name + "-" + id + "] --");
for (Map.Entry<String, Integer> entry : counters.entrySet()) {
LOG.info(entry.getKey() + ": " + entry.getValue());
}
LOG.info("");
}
}
}
}
WordCountTopology主入口,拓扑构建
- 这里通过setSpout和setBolt将上面的三个节点连接成线 —— 即最开始说明的链式拓扑图
- JStorm提交执行相关执行参数统一写入一个Properties或Yaml配置文件中,命令行执行第一个参数是该配置文件的路径
public class WordCountTopology {
private static Logger LOG = LoggerFactory.getLogger(WordCountTopology.class);
//装载配置文件配置参数
private static Map conf = new HashMap<Object, Object>();
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Please input configuration file");
System.exit(-1);
}
//加载配置文件配置到内存
LoadConf(args[0]);
//构建JStorm拓扑
TopologyBuilder builder = setupBuilder();
System.out.println("Topology准备提交");
//提交任务到集群
submitTopology(builder);
System.out.println("Topology提交完成");
}
//!!!!这里通过setSpout和setBolt设置各个节点之间的连接关系,
// 是这里把所有各自独立的节点用线连接起来,构建成一张具体的任务执行拓扑图
private static TopologyBuilder setupBuilder() throws Exception {
TopologyBuilder builder = new TopologyBuilder();
/*
* 设置spout和bolt,完整参数为
* 1,spout的id(即name)
* 2,spout对象
* 3,executor数量即并发数,也就是设置多少个executor来执行spout/bolt(此项没有默认null)
*/
//setSpout,声明Spout名称Id为sentence-spout,并行度1
builder.setSpout("sentence-spout", new RandomSentenceSpout(), 1);
//setBolt:SplitBolt的grouping策略是上层随机分发,CountBolt的grouping策略是按照上层字段分发
//如果想要从多个Bolt获取数据,可以继续设置grouping
//声明Bolt名称Id为split-bolt,并行度1
builder.setBolt("split-bolt", new SplitBolt(), 1)
//设置该Bolt的数据源为sentence-spout的输出
.shuffleGrouping("sentence-spout");
//声明Bolt名称Id为count-bolt,并行度1
builder.setBolt("count-bolt", new CountBolt(), 1)
//设置该Bolt的数据源为sentence-spout和split-bolt的输出
//fieldsGrouping保证相同word对应的值发送到同一个Task节点,这是单词计数业务需要
.fieldsGrouping("split-bolt", new Fields("word"))
.fieldsGrouping("sentence-spout", new Fields("word"));
return builder;
}
//提交任务到JStorm集群
private static void submitTopology(TopologyBuilder builder) {
try {
if (local_mode(conf)) {//本地模式,需要有本地JStorm环境支持
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(
String.valueOf(conf.get("topology.name")), conf,
builder.createTopology());
Thread.sleep(200000);
cluster.shutdown();
} else {
StormSubmitter.submitTopology(
String.valueOf(conf.get("topology.name")), conf,
builder.createTopology());
}
} catch (Exception e) {
LOG.error(e.getMessage(), e.getCause());
}
}
//加载Properties配置文件
private static void LoadProperty(String prop) {
Properties properties = new Properties();
try {
InputStream stream = new FileInputStream(prop);
properties.load(stream);
} catch (FileNotFoundException e) {
System.out.println("No such file " + prop);
} catch (Exception e1) {
e1.printStackTrace();
return;
}
conf.putAll(properties);
}
//加载Yaml配置文件
private static void LoadYaml(String confPath) {
Yaml yaml = new Yaml();
try {
InputStream stream = new FileInputStream(confPath);
conf = (Map) yaml.load(stream);
if (conf == null || conf.isEmpty() == true) {
throw new RuntimeException("Failed to read config file");
}
} catch (FileNotFoundException e) {
System.out.println("No such file " + confPath);
throw new RuntimeException("No config file");
} catch (Exception e1) {
e1.printStackTrace();
throw new RuntimeException("Failed to read config file");
}
}
//根据后缀名选择加载配置文件方案
private static void LoadConf(String arg) {
if (arg.endsWith("yaml")) {
LoadYaml(arg);
} else {
LoadProperty(arg);
}
}
public static boolean local_mode(Map conf) {
String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
if (mode != null) {
if (mode.equals("local")) {
return true;
}
}
return false;
}
}
批量用法
基本的用法是每次处理一个tuple,但是这种效率比较低,很多情况下是可以批量获取消息然后一起处理,批量用法对这种方式提供了支持。打开代码可以很明显地发现jstorm和storm的有着不小的区别:
// storm 中的定义
public interface IBatchSpout extends Serializable {
void open(Map conf, TopologyContext context);
void emitBatch(long batchId, TridentCollector collector);// 批次发射tuple
void ack(long batchId); // 成功处理批次
void close();
Map getComponentConfiguration();
Fields getOutputFields();
}
// jstorm中的定义
public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable {
}
另外如果用批次的话就需要改用BatchTopologyBuilder来构建拓扑结构,在IBatchSpout中主要实现的接口如下:
- execute:虽然和IBolt中名字、参数一致,但是增加了一些默认逻辑
- 入参的input.getValue(0)表示批次(BatchId)。
- 发送消息时collector.emit(new Values(batchId, value)),发送的列表第一个字段表示批次(BatchId)。
- commit:批次成功时调用,常见的是修改offset。
- revert:批次失败时调用,可以在这里根据offset取出批次数据进行重试。
Ack机制
为保证无数据丢失,Storm/JStorm使用了非常漂亮的可靠性处理机制,如图当定义Topology时指定Acker,JStorm除了Topology本身任务外,还会启动一组称为Acker的特殊任务,负责跟踪Topolgogy DAG中的每个消息。每当发现一个DAG被成功处理完成,Acker就向创建根消息的Spout任务发送一个Ack信号。Topology中Acker任务的并行度默认parallelism hint=1,当系统中有大量的消息时,应该适当提高Acker任务的并行度。
Acker按照Tuple Tree的方式跟踪消息。当Spout发送一个消息的时候,它就通知对应的Acker一个新的根消息产生了,这时Acker就会创建一个新的Tuple Tree。当Acker发现这棵树被完全处理之后,他就会通知对应的Spout任务。
Acker任务保存了数据结构
Map<MessageID,Map< TaskID, Value>>
,其中MessageID是Spout根消息ID,TaskID是Spout任务ID,Value表示一个64bit的长整型数字,是树中所有消息的随机ID的异或结果。通过TaskID,Acker知道当消息树处理完成后通知哪个Spout任务,通过MessageID,Acker知道属于Spout任务的哪个消息被成功处理完成。Value表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的MessageID发送过来做异或。当Acker发现一棵树的Value值为0的时候,表明这棵树已经被成功处理完成。-
举例说明具体流程,以下为拓扑:
Acker数据的变化过程:(算法)
Step1:A发送T0给B后:
产生一个随机数r0,树种存R0:R0=r0
<id0,<taskA,R0>>
# ---------
Step2.B接收到T0并成功处理后向C发送T1,向D发送T2:
接收到上级传过来的R0,自己传给两个下家,产生两个随机数代表下家存入树中:R1=R0^r1^r2=r0^r1^r2
<id0,<taskA,R0^R1>>
=<id0,<taskA,r0^r0^r1^r2>>
=<id0,<taskA,r1^r2>>
# ---------
Step3.C接收到T1并成功处理后:
接收到上家传过来的r1,没有下家:R2=r1
<id0,<taskA,r1^r2^R2>>
=<id0,<taskA,r1^r2^r1>>
=<id0,<taskA,r2>>
# ---------
Step4.D接收到T2并成功处理后:
接收到上家传过来的r2,没有下家:R3=r2
<id0,<taskA,r2^R3>>
=<id0,<taskA,r2^r2>>
=<id0,<taskA,0>>
当结果为0时Acker可以通知taskA根消息id0的消息树已被成功处理完成,调用Spout的ack方法通知,若超时发现消息树中值不为0,调用Spout中的fail。
-
整体节点间通信:
- 需要指出的是,Acker并不是必须的,当实际业务可以容忍数据丢失情况下可以不用Acker,对数据丢失零容忍的业务必须打开Acker,另外当系统的消息规模较大是可适当增加Acker的并行度。
JStorm事务
事务拓扑并不是新的东西,只是在原始的ISpout、IBolt上做了一层封装。在事务拓扑中以并行(processing)和顺序(commiting)混合的方式来完成任务,使用Transactional Topology可以保证每个消息只会成功处理一次。不过需要注意的是,在Spout需要保证能够根据BatchId进行多次重试,在这里有一个基本的例子,这里有一个不错的讲解。
Trident
这次一种更高级的抽象(甚至不需要知道底层是怎么map-reduce的),所面向的不再是spout和bolt,而是stream。主要涉及到下面几种接口:
- 在本地完成的操作
- Function:自定义操作。
- Filters:自定义过滤。
- partitionAggregate:对同批次的数据进行local combiner操作。
- project:只保留stream中指定的field。
- stateQuery、partitionPersist:查询和持久化。
- 决定Tuple如何分发到下一个处理环节
- shuffle:随机。
- broadcast:广播。
- partitionBy:以某一个特定的field进行hash,分到某一个分区,这样该field位置相同的都会放到同一个分区。
- global:所有tuple发到指定的分区。
- batchGlobal:同一批的tuple被放到相同的分区(不同批次不同分区)。
- partition:用户自定义的分区策略。
- 不同partition处理结果的汇聚操作
- aggregate:只针对同一批次的数据。
- persistentAggregate:针对所有批次进行汇聚,并将中间状态持久化。
- 对stream中的tuple进行重新分组,后续的操作将会对每一个分组独立进行(类似sql中的group by)
- groupBy
- 将多个Stream融合成一个
- merge:多个流进行简单的合并。
- join:多个流按照某个KEY进行UNION操作(只能针对同一个批次的数据)。
在这里有一个jstorm中使用Trident的简单例子。
故障恢复
-
节点故障
- Nimbus故障。Nimbus本身无状态,所以Nimbus故障不会影响正在正常运行任务,另外Nimbus HA保证Nimbus故障后可以及时被备份Nimbus接管。
- Supervisors节点故障。Supervisor故障后,Nimbus会将故障节点上的任务迁移到其他可用节点上继续运行,但是Supervisor故障需要外部监控并及时手动重启。
- Worker故障。Worker健康状况监控由Supervisor负责,当Woker出现故障时,Supervisor会及时在本机重试重启。
- Zookeeper节点故障。Zookeeper本身具有很好的故障恢复机制,能保证至少半数以上节点在线就可正常运行,及时修复故障节点即可。
-
任务失败
- Spout失败。消息不能被及时被Pull到系统中,造成外部大量消息不能被及时处理,而外部大量计算资源空闲。
- Bolt失败。消息不能被处理,Acker持有的所有与该Bolt相关的消息反馈值都不能回归到0,最后因为超时最终Spout的fail将被调用。
- Acker失败。Acker持有的所有反馈信息不管成功与否都不能及时反馈到Spout,最后同样因为超时Spout的fail将被调用。
- 任务失败后,需要Nimbus及时监控到并重新分配失败任务。
JStorm使用感受
- JStorm各个节点之间是松耦合的,各个节点之间的通信只和Tuple数据流结构相关,其他处理逻辑各自独立
- JStorm不处理数据的存储服务,计算结果自行存储到HDFS、HBase、Mysql、ElasticSearch等存储系统当中
- JStorm的拓扑节点设计中,应该把延时操作分发到多个节点当中执行,每个节点只处理各自单一的功能逻辑,如上面的例子,我把单词分割和单词计数分成两个Bolt来实现,这才是流式计算的特点,让数据流动起来,而不是在一个节点完成所有工作,也保证了程序可用性更强
- JStorm各个节点内部的处理逻辑非常开放,想怎么处理都行,只要最终往后输出相应的Tuple即可,编程时非常自由,不像MapReduce,很多操作都在MR模型中得到限制