Storm高阶(一):Trident

1、Trident简介

  • 在原Storm实时流计算基础上的高层次抽象,抽象掉了事务处理和状态管理的细节,Storm0.7版本的Transactional Topology现在已经被Trident完全替代,废弃不用了;
  • 将tuple组成一批批进行离散的事务处理;
  • 在trident中保留了Spout,但是不再有bolt组件,将之前在bolt中所实现的数据处理逻辑抽象成一系列的Operation,比如函数功能、过滤和聚合操作
  • 提供数据至少被处理一次,甚至有且仅有一次的保障,事务数据持久化,以及一系列公共的流分析处理操作
  • Trident以Batch的形式处理Stream
  • 底层仍然是spout和bolt,就是说执行的时候,storm框架还是会把Trident解析构建成spout和bolt在执行,如下
image.png
image.png

2、Trident事务及实现原理

  • Trident事务有三个层次:
    • 非事务 no-transactional:当失败没有重试的情况下,tuple可能至多一次被处理;当出现在多个批次中都被成功处理的情况下,可能至少一次被处理
    • 严格事务 transactional:tuple只会出现在一个批次中,同一批次内的tuples在该批次失败重试不会发生变化,任何tuple都会出现在某个批次中,不会被跳过;也就是说数量流被分成固定的批次;
      非常严格,当批次中某个tuple执行失败,批次重试仍失败的话,处理会被挂起
    • 不透明事务 opaque transactional:同一个tuple可能出现在多个批次中,但只会在其中一个批次里面处理成功,即保障仅仅一次被处理成功,相对于严格的事务,不透明事务提供了容错性,只是保证尽可能将tuple处理成功,当某个tuple在一个批次中失败,可以在另一个批次中重试;也就是exactly-once有且只有一次成功

注: storm-kafka提供这两种OpaqueTridentKafkaSpout、TransactionalTridentKafkaSpout事务Spout

  • Trident事务实现原理
    • 将多个Tuples按小批次进行处理
    • 给每个批次分配一个唯一的事务ID,如果该批次失败重试,这个事务ID不变
    • 状态更新按批次顺序进行,后面的批次必须等前面的批次更新完成才能进行更新

3、Trident编码操作

  • 自定义实现TridentSpout

    • 实现ITridentSpout接口
    • MetaData 批次信息元数据,存储在Zookeeper中
    • BatchCoordinator 分配批次
    • Emitter 发射批次内的tuple到下一个组件
    • 常用的Kafka Spout:KafkaSpout、TransactionalTridentKafkaSpout、OpaqueTridentKafkaSpout
  • Trident操作注意事项

    • 区分是否需要跨网络传输
    • 区分是否跨分区
    • 对于Trident的聚合操作,要区分是否本批次聚合还是全局聚合
  • Trident操作类型

    • 分区本地操作:作用在每个分区本地,不用跨网络传输的操作,如Filter、Function
    • 重分区操作:将Stream流数据重分区,但不改变内容(包括跨网络数据传输)
    • 聚合操作:需要跨网络传输数据
    • 在分组的流stream上的操作
    • Merges 和 Joins
      注:Partition:在Storm中并发的最小执行单位是task;在Trident中partition相当于task的角色,也是最小执行单位
  • Trident并发度 parallelismHint
    在某个操作调用后,stream调用 parallelismHint,设置前面这个操作的并发度,如下
    .each(new Fields("str"), new SplitFunction(),new Fields("word"))
    .parallelismHint(2)//设置2个executor来执行splitfunction操作

  • Trident Function

    • 对输入的Tuple进行某种函数操作,对输入tuple的一系列Fields进行处理,输出零到多个Tuple,输出的Fields会追加到Trident数据流上,但如果function没有向后面执行emit操作,则会将原来的输入tuple过滤掉
    • 无需跨网络传输数据
    • 实现Function接口,或者继承BaseFunction抽象类
  • Trident Filter

    • 过滤器,输入Tuple,执行规则判断是否保留该Tuple
    • 无需跨网络传输
    • isKeep方法,这里实现是否保留tuple的规则判断逻辑
    • 实现Filter接口,或者继承BaseFilter抽象类
  • 聚合链

    • chainedAgg
    • partitionAggregate
    • chainEnd


      image.png
  • partitionAggregate 分区组合

    • 对各分区partition(最小执行单位,一个task)内的一个批次tuple数据进行函数操作,但与之前函数操作不同的是,partitionAggregate会替换掉输入tuple,而不是将输出tuple追加到流上
    • 这里用到的函数操作有三种,
      • CombinerAggregate:Sum,Count
      • ReduceAggregate
      • Aggregator
  • Projection 投影

    • 对输入tuple截取只需要输出的的Fields,即去掉后面不需要的keyvalue


      image.png
  • Trident Repartitioning 重分区操作

    • Repartitioning Operations类似于Storm中的数据流分组
    • Shuffle : 随机重分区
    • global:所有tuple进入相同的一个Partition上
    • partitionBy:按字段重分区,保证了字段值相同的tuple进入相同的partition上
    • batchGlobal:相同批次内的tuple进入同一个Partition上,不同批次的tuple进入不同的Partition上
    • broadcast:广播方式重分区,即将每个tuple复制到后面的所有partition上,一般结合drpc使用
    • partition:自定义分区,实现接口backtype.storm.grouping.CustomStreamGrouping
  • Trident Aggregate

    • aggregate:单独对每个批次的数据进行聚合
    • persistentAggregate:对数据流中处理过的所有tuple进行聚合操作(全局聚合),并将结果存储在内存或者其他存储设备上


      image.png
  • Group By 分组

    • 在partition By 基础上对指定fields字段值相同的tuple进行分组
    • 与partitionBy的区别:partitionBy只讲字段值求HashCode,再与tasks数取模得到结果,根据结果重新分配到相应的task里去,而groupBy,则进一步对指定的Fields字段值相同的tuple进行分组
    • groupBy 结合partitionAggregator,进行一个批次内各分区内的分组统计
    • groupBy 结合persistentAggregator,进行全局分组统计
  • Trident Status

    • 在进行prisistentAggregate操作时,需要不断更新结果,所以需要将中间结果保存在内存、或者其他存储设备中,这个过程还需要考虑更新过程中容错性问题
    • 使用内存存储,如果Storm集群重启,原来的结果数据也丢失,如下


      image.png
    • 使用外部存储设备,即使storm集群重启,也可以在原来的基础上进行更新结果

4、DRPC

  • DRPC:Distributed RPC ,分布式RPC,目前DRPC已经结合Trident一起使用
image.png
  • DRPC本地模式测试

    • LocalDRPC localDRPC = new LocalDRPC();//构造本地DRPC客户端
    • tridentTopology.newDRPCStream("functionName",localDRPC)
    • LocalCluster cluster = new LocalCluster();//本地模式提交Topology
      cluster.submitTopology("wordcountTrident",new Config(),togology.build());
    • 发送DRPC请求,并得到响应结果
      String result = localDRPC.execute("words","hello world");
  • DRPC远程模式测试

    • 修改conf/storm.yaml配置文件,添加drpc.server和drpc.port参数
    • 启动DRPC进程
      $ nohup bin/storm drpc > /dev/null 2>&1 &
    • 编写topology,并打jar包提交到Storm集群上运行
    • 编写drpc客户端发送DRPC请求,并得到结果
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,188评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,464评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,562评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,893评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,917评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,708评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,430评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,342评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,801评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,976评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,115评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,804评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,458评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,008评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,135评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,365评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,055评论 2 355

推荐阅读更多精彩内容