1、Trident简介
- 在原Storm实时流计算基础上的高层次抽象,抽象掉了事务处理和状态管理的细节,Storm0.7版本的Transactional Topology现在已经被Trident完全替代,废弃不用了;
- 将tuple组成一批批进行离散的事务处理;
- 在trident中保留了Spout,但是不再有bolt组件,将之前在bolt中所实现的数据处理逻辑抽象成一系列的Operation,比如函数功能、过滤和聚合操作
- 提供数据至少被处理一次,甚至有且仅有一次的保障,事务数据持久化,以及一系列公共的流分析处理操作
- Trident以Batch的形式处理Stream
- 底层仍然是spout和bolt,就是说执行的时候,storm框架还是会把Trident解析构建成spout和bolt在执行,如下
image.png
image.png
2、Trident事务及实现原理
- Trident事务有三个层次:
- 非事务 no-transactional:当失败没有重试的情况下,tuple可能至多一次被处理;当出现在多个批次中都被成功处理的情况下,可能至少一次被处理
- 严格事务 transactional:tuple只会出现在一个批次中,同一批次内的tuples在该批次失败重试不会发生变化,任何tuple都会出现在某个批次中,不会被跳过;也就是说数量流被分成固定的批次;
非常严格,当批次中某个tuple执行失败,批次重试仍失败的话,处理会被挂起 - 不透明事务 opaque transactional:同一个tuple可能出现在多个批次中,但只会在其中一个批次里面处理成功,即保障仅仅一次被处理成功,相对于严格的事务,不透明事务提供了容错性,只是保证尽可能将tuple处理成功,当某个tuple在一个批次中失败,可以在另一个批次中重试;也就是exactly-once有且只有一次成功
注: storm-kafka提供这两种OpaqueTridentKafkaSpout、TransactionalTridentKafkaSpout事务Spout
- Trident事务实现原理
- 将多个Tuples按小批次进行处理
- 给每个批次分配一个唯一的事务ID,如果该批次失败重试,这个事务ID不变
- 状态更新按批次顺序进行,后面的批次必须等前面的批次更新完成才能进行更新
3、Trident编码操作
-
自定义实现TridentSpout
- 实现ITridentSpout接口
- MetaData 批次信息元数据,存储在Zookeeper中
- BatchCoordinator 分配批次
- Emitter 发射批次内的tuple到下一个组件
- 常用的Kafka Spout:KafkaSpout、TransactionalTridentKafkaSpout、OpaqueTridentKafkaSpout
-
Trident操作注意事项
- 区分是否需要跨网络传输
- 区分是否跨分区
- 对于Trident的聚合操作,要区分是否本批次聚合还是全局聚合
-
Trident操作类型
- 分区本地操作:作用在每个分区本地,不用跨网络传输的操作,如Filter、Function
- 重分区操作:将Stream流数据重分区,但不改变内容(包括跨网络数据传输)
- 聚合操作:需要跨网络传输数据
- 在分组的流stream上的操作
- Merges 和 Joins
注:Partition:在Storm中并发的最小执行单位是task;在Trident中partition相当于task的角色,也是最小执行单位
Trident并发度 parallelismHint
在某个操作调用后,stream调用 parallelismHint,设置前面这个操作的并发度,如下
.each(new Fields("str"), new SplitFunction(),new Fields("word"))
.parallelismHint(2)//设置2个executor来执行splitfunction操作
-
Trident Function
- 对输入的Tuple进行某种函数操作,对输入tuple的一系列Fields进行处理,输出零到多个Tuple,输出的Fields会追加到Trident数据流上,但如果function没有向后面执行emit操作,则会将原来的输入tuple过滤掉
- 无需跨网络传输数据
- 实现Function接口,或者继承BaseFunction抽象类
-
Trident Filter
- 过滤器,输入Tuple,执行规则判断是否保留该Tuple
- 无需跨网络传输
- isKeep方法,这里实现是否保留tuple的规则判断逻辑
- 实现Filter接口,或者继承BaseFilter抽象类
-
聚合链
- chainedAgg
- partitionAggregate
-
chainEnd
image.png
-
partitionAggregate 分区组合
- 对各分区partition(最小执行单位,一个task)内的一个批次tuple数据进行函数操作,但与之前函数操作不同的是,partitionAggregate会替换掉输入tuple,而不是将输出tuple追加到流上
- 这里用到的函数操作有三种,
- CombinerAggregate:Sum,Count
- ReduceAggregate
- Aggregator
-
Projection 投影
-
对输入tuple截取只需要输出的的Fields,即去掉后面不需要的keyvalue
image.png
-
-
Trident Repartitioning 重分区操作
- Repartitioning Operations类似于Storm中的数据流分组
- Shuffle : 随机重分区
- global:所有tuple进入相同的一个Partition上
- partitionBy:按字段重分区,保证了字段值相同的tuple进入相同的partition上
- batchGlobal:相同批次内的tuple进入同一个Partition上,不同批次的tuple进入不同的Partition上
- broadcast:广播方式重分区,即将每个tuple复制到后面的所有partition上,一般结合drpc使用
- partition:自定义分区,实现接口backtype.storm.grouping.CustomStreamGrouping
-
Trident Aggregate
- aggregate:单独对每个批次的数据进行聚合
-
persistentAggregate:对数据流中处理过的所有tuple进行聚合操作(全局聚合),并将结果存储在内存或者其他存储设备上
image.png
-
Group By 分组
- 在partition By 基础上对指定fields字段值相同的tuple进行分组
- 与partitionBy的区别:partitionBy只讲字段值求HashCode,再与tasks数取模得到结果,根据结果重新分配到相应的task里去,而groupBy,则进一步对指定的Fields字段值相同的tuple进行分组
- groupBy 结合partitionAggregator,进行一个批次内各分区内的分组统计
- groupBy 结合persistentAggregator,进行全局分组统计
-
Trident Status
- 在进行prisistentAggregate操作时,需要不断更新结果,所以需要将中间结果保存在内存、或者其他存储设备中,这个过程还需要考虑更新过程中容错性问题
-
使用内存存储,如果Storm集群重启,原来的结果数据也丢失,如下
image.png - 使用外部存储设备,即使storm集群重启,也可以在原来的基础上进行更新结果
4、DRPC
- DRPC:Distributed RPC ,分布式RPC,目前DRPC已经结合Trident一起使用
image.png
-
DRPC本地模式测试
- LocalDRPC localDRPC = new LocalDRPC();//构造本地DRPC客户端
- tridentTopology.newDRPCStream("functionName",localDRPC)
- LocalCluster cluster = new LocalCluster();//本地模式提交Topology
cluster.submitTopology("wordcountTrident",new Config(),togology.build()); - 发送DRPC请求,并得到响应结果
String result = localDRPC.execute("words","hello world");
-
DRPC远程模式测试
- 修改conf/storm.yaml配置文件,添加drpc.server和drpc.port参数
- 启动DRPC进程
$ nohup bin/storm drpc > /dev/null 2>&1 & - 编写topology,并打jar包提交到Storm集群上运行
- 编写drpc客户端发送DRPC请求,并得到结果