Storm基础(四)保证消息处理

原文链接:Guaranteeing Message Processing

本人原创翻译,转载请注明出处

Storm提供了几种不同级别的保证消息处理机制,包括best effort, at least once, 通过Trident实现的exactly once。这篇文章描述了Storm如何保证at least once处理。

一个消息被完全处理(fully processed)究竟是什么意思?

一个tuple从spout中发出可能触发成千上万个tuples的创建。例如,单词计数topology:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
                                               22133,
                                               "sentence_queue",
                                               new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));

这个topology 从Kestrel队列中读取句子,把句子拆分成单词组,然后每次emit一个单词(如果单词重复出现,那么出现多少次emit多少次)。这解释了一个tuple如何导致n个tuples被创建:句子中的每个单词,都会成为一个单词tuple和一个更新单词计数的tuple。消息树大概像这样:


Storm定义一个从spout发出的tuple被完全处理,当且仅当tuple树已经为空并且树中的每个消息都已被处理。如果tuple没有在给定的超时时间(timeout)内被完全处理,就定义为处理失败。timeout可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来配置,默认是30秒。

当一个消息被完全处理或没有被完全处理时发生了什么?

为了理解这个问题,让我们看看tuple从spout开始的生命周期。作为参考,这里是spouts实现的接口:

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,Storm通过调用Spout的nextTuple方法来请求一个tuple。Spout使用SpoutOutputCollector(在open函数中提供)来emit一个tuple到某个输出流。当emitting tuple的时候,Spout设置了一个"message id",后续会用来识别tuple。举个例子,KestrelSpout从kestrel队列中读取消息,由Kestrel给出id并设置为"message id",然后emit。像这样发出消息:

_collector.emit(new Values("field1", "field2", 3) , msgId);

接下来,tuple被发送给消费bolts,Storm负责维护消息树。如果Storm检测到一个tuple被完全处理了,Storm会调用Spout的ack方法(携带参数message id)。同样的,如果tuple处理超时,Storm将调用Spout的fail方法。注意,一个tuple只会被创建它的那个Spout任务acked或failed,如果Spout被集群中的多个任务执行,tuple不会被非创建它的任务acked或failed。

再次以KestrelSpout为例来说明Spout如何保证消息处理。当KestrelSpout从Kestrel队列中取出一个消息,它"opens"这个消息,消息并没有真的从队列中取下来,只是设置了一个挂起("pending")状态,等待消息处理完成的确认。处于挂起状态的消息不会被发送给其他队列消费者。此外,如果一个客户端失去连接,它的所有挂起状态的消息会被放回队列。KestrelSpout会给SpoutOutputCollector传递一个"message id"参数,稍后,KestrelSpout的ack和fail函数被调用,KestrelSpout会给Kestrel发一个带"message id"的ack或fail消息,进而将消息移除或放回队列。

什么是Storm的可靠性API?

要想利用Storm的可靠性能力要做两件事。首先,任何时候你在tuple树中创建新的link都要通知Storm。其次,当你完成一个独立tuple的处理时也要通知Storm。通过做这两件事,Storm可以检测tuple树是否处理完毕并恰当的处理spout tuple的ack或fail。Storm的API提供了一种简洁的方式来完成这些任务。

在tuple树中指定一个link被称作锚定(anchoring)。在你emit一个新的tuple时就同步完成了锚定。举个例子,下面这个bolt把一个包含句子的tuple拆分成每个单词的tuple:

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

每个单词tuple通过指定输入tuple为emit的第一个参数而锚定。由于单词tuple已被锚定,在单词tuple处理失败的时候,tuple树的根spout tuple将被重新传输。相反的,让我们看看如果像这样emit tuple会发生什么:

_collector.emit(new Values(word));

这样emit的单词tuple没有被锚定,如果tuple处理失败,根tuple不会被重传。取决于你的容错需求,有时候以非锚定的方式emit tuple也是恰当的。

一个输出tuple可以被锚定到多个输入tuple,这对流连接或流聚合(streaming joins or aggregations)很有用。被多个输入锚定的tuple处理失败,会导致多个根tuple重传。例子:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

多锚定(Multi-anchoring)将把输出tuple加入到多个tuple树。注意,这可能会破坏树的结构并且创建tuple 有向无环图(DAGs)。例如:


Tuple DAG

Storm的实现支持有向无环图和树。

锚定就是你如何说明tuple树——下一个也是最后一个关于Storm可靠性API的点是当你处理完一个独立的tuple时,如何说明tuple树。通过调用OutputCollector的ack和fail来实现这个操作。如果你往回看例子SplitSentence,你会看到在所有单词tuple被emit之后输入tuple被确认了(acked)。

你可以使用OutputCollector 的fail方法来立即使根tuple(spout tuple)失败。例如,你的应用也许会选择捕获数据库客户端的异常,显式的使输入tuple失败。通过显式的使tuple失败,根tuple可以比等待超时更快的被重传。

每个tuple都应该被ack或fail。Storm占用了内存来跟踪每一个tuple,如果不ack/fail每个tuple,任务可能最终会耗尽内存。

许多bolts使用了一种通用模式来读取和发出输入tuple,在execute方法的最后ack tuple。这些bolts归类为过滤器和简单函数(filters and simple functions)。Storm提供了一个BasicBolt接口封装了这种模式,SplitSentence例子可以用BasicBolt实现:

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

这种实现比之前的实现简单,语义上一致。Tuples自动锚定到输入tuple,execute方法完成时自动ack。
相反,实现聚合和连接的bolts可能会延迟ack,直到一组tuples处理完毕。聚合和连接一般也会多锚定(multi-anchor),IBasicBolt不能自动做这些。

如果tuples可以重传,程序该如何正确工作?

软件设计的一贯答案是“取决于”。如果你一定要一个答案,考虑使用Trident API。某些情况下,如要做很多分析并且可以容忍丢失数据,那么可以通过设置acker bolts为0(Config.TOPOLOGY_ACKERS)来禁用容错。但在有些情况下,你想要确保每个数据都被至少处理了一次并且没有丢失。

Storm如何有效的实现可靠性?

Storm的topology有一些特殊的“acker”任务,负责追踪每个spout tuple的tuples DAG,一旦acker发现DAG完成了,它就会发一个确认消息给spout。你可以通过Config.TOPOLOGY_ACKERS设置acker任务的数量。Storm默认是每个worker有一个acker。

不管是spout还是bolt发出的tuple都有一个64位的id。每个tuple都知道tuple树中的所有spout tuples的ids。当你发出一个新tuple时,老的tuple锚定的spout tuples ids被复制到新的tuple。当一个tuple被确认了,它会发一个tuple树如何变更的消息给acker任务,特别地,消息可能像这样:“我已经完成了tuple树中这个spout tuple的处理,树中有一些新的tuples以我为锚”。

例如,如果tuples “D”和“E”是基于tuple “C”而创建,当“C”确认时,tuple树的变化如下:

由于在“D”和“E”创建的同时,“C”被从树中移除了,树永远不会过早的(prematurely)完成。注:这句不是很理解

还有一些细节要提一下。之前提到可以有多个acker任务,那么当一个tuple被确认时,如何知道由哪一个任务发送确认信息?

Storm使用mod hashing来映射spout tuple id到acker任务。由于每个tuple都携带了它所在所有树中的spout tuple ids,因此知道该与哪个acker任务通信。

另一个细节是acker任务如何跟踪spout任务。当spout task发出一个新tuple,它只是简单的发送消息到恰当的acker,告诉它为这个spout tuple负责。之后当一个acker发现树已经完成,它就知道该给哪个任务id发完成信息。

acker任务不会显式追踪tuples树。对于有好几万节点(甚至更多)的大tuple树,跟踪所有的tuple树可能会造成内存不够用。ackers采用一种策略,对每个spout tuple只要求固定数量的内存(大约20字节)。这个追踪算法是理解Storm工作的关键,也是Storm主要的突破之一。

acker任务存储了一个spout tuple到一组值的map。第一个值是任务id,用来发送完成信息。第二个值是64位数字,名为“ack val”,这个值代表了整个tuple树的状态,无论树多大多小。它只是简单的把树中所有已创建或确认的tuple ids做xor运算。

当一个acker任务发现“ack val”变成了0,它就知道tuple树完成了。由于tuple ids是64位随机数,“ack val”意外变成0的概率极小。用数学知识算一下,每秒10K个acks,大概要花50,000,000年才会发生一个错误。即使发生错误,也只是丢失数据。

现在你理解了可靠性算法,让我们过一遍失败的情形,看看每种情形下Storm如何避免数据丢失:

  • 由于任务异常终止,tuple未被确认:这种情况下失败tuple的树根处的spout tuple将超时并重发。
  • acker任务异常终止:这种情况下,所有这个akcer跟踪的spout tuples都会超时并重发。
  • spout任务异常终止:这种情况下,spout的源负责重发消息。例如,客户端失去连接时,像Kestrel和RabbitMQ这样的队列将把挂起的消息重新放回队列。

如你所见,Storm的可靠性机制是完全分布式、大规模和容错的。

调教reliability

acker任务是轻量级的,所以在一个topology里不需要很多个acker。你可以通过Storm UI(组件id“__acker”)跟踪acker的性能,如果吞吐量不行,可以增加acker的数量。

如果可靠性对你不重要——你不关心丢失tuples,那么你可以通过不追踪tuple树来增加性能。不追踪tuple树可以减半消息传输的数量,另外,下游的tuple可以保存更少的ids,节省了网络带宽。

有三种方式可以移除可靠性。第一种是设置Config.TOPOLOGY_ACKERS为0。这种情况下,Storm会在spout发出tuple时立即调用ack方法。

第二种是移除消息上的可靠性。你可以在调用SpoutOutputCollector.emit方法的时候不传消息id,这样就关闭了对某个spout tuple的追踪。

最后,如果你不关心下游tuples是否处理失败,你可以在emit它们的时候不锚定它们。由于它们没有锚定到任何spout tuples上,它们没被确认不会导致任何spout tuples失败。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,817评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,329评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,354评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,498评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,600评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,829评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,979评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,722评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,189评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,519评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,654评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,940评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,762评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,993评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,382评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,543评论 2 349

推荐阅读更多精彩内容