Trident join

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/**

  • Created by tangchunsong on 2018/7/11.
  • 结论:
  • topology.join 是将两个stream内的两个batch的tuple进行join,join完了,这两个batch就pass了
  • 轮到这两个流的分别的下面的batch进行join了
  • 两个batch进行join是:
  •  只有key能join上,才会输出,inner join
    

*/
public class StreamJoinMain {

public static StormTopology buildTopology() {
FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("key", "value1"), 3, new Values("a", "1"),
new Values("b", "2"), new Values("a", "3"), new Values("a", "4"));
spout1.setCycle(true);//Spout是否循环发送

FixedBatchSpout spout2 = new FixedBatchSpout(new Fields("key", "value2"), 3, new Values("a", "1"),
        new Values("b", "2"), new Values("a", "3"), new Values("a", "5"), new Values("a", "6"));
spout2.setCycle(true);//Spout是否循环发送

TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream("spout1", spout1).parallelismHint(2);
Stream stream2 = topology.newStream("spout2", spout2).parallelismHint(2);

topology.join(stream1, new Fields("key"), stream2, new Fields("key"), new Fields("key", "value1", "value2"))
        .peek(new Consumer() {
          public void accept(TridentTuple input) {
            System.out.println(input.toString());
            try {
              Thread.sleep(2000);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        });

return topology.build();

}
public static void main(String[] args) {
Config conf = new Config();
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf, buildTopology());
}
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,969评论 19 139
  • pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...
    mpro阅读 9,505评论 0 13
  • 分布式流处理需求日益增加,包括支付交易、社交网络、物联网(IOT)、系统监控等。业界对流处理已经有几种适用的框架来...
    Albert陈凯阅读 11,174评论 1 29
  • 原文链接 译者:魏勇 Trident 中含有对状态化(stateful)的数据源进行读取和写入操作的一级抽象...
    Albert陈凯阅读 512评论 0 1
  • 我问自己你是否还年轻你的灵魂是否还很纯净 人群中是谁在艰难走走停停 又是谁在仰望着命运...... 28岁之前,没...
    七少Jade阅读 267评论 0 0