Storm自定义Scheme切分源数据

前言

翻看师傅写的一个strom计步算法又发现了一个新技能。get之后就赶紧来写笔记。没办法,对于LZ这种前学后忘,不长记性的搬砖工人,记笔记、写博客是唯一与遗忘作斗争最有效的办法,不说废话,直奔主题。

在用strom与kafka做整合的时候,KafkaSpout和SpoutConfig这两个类是必不可少的,值得注意的是后者是前者创建的必要参数,也就是说在创建KafkaSpout对象之前,必须先准备一个SpoutConfig对象。前者是kafka消费者与storm的spout数据源的一个整合类,其目的就是让kafka的消费数据直接作为storm的数据源进行数据处理。而后者则是spout的一个配置类,它需要配置kafka的各种信息,包括节点、端口、主题等等。另外还有一些辅助非强制性配置的信息,在业务所需的情况下也有必要配置,例如LZ今天要分享的scheme配置。

在使用KafkaStorm时需要子类实现Scheme,storm-kafka实现了StringScheme,KeyValueStringScheme等等,大家可以用。我们先来看看scheme接口的源码

    public interface Scheme extends Serializable {  
        public List<Object> deserialize(byte[] ser);  
        public Fields getOutputFields();  
    }  

需要实现反序列化方法和输出fields名称,来看简单StringScheme实现:

public class MyScheme implements Scheme {  
  
    public static final String STRING_SCHEME_KEY = "str";  
  
    public List<Object> deserialize(byte[] bytes) {  
        return new Values(deserializeString(bytes));  
    }  
  
    public static String deserializeString(byte[] string) {  
        try {  
            return new String(string, "UTF-8");  
        } catch (UnsupportedEncodingException e) {  
            throw new RuntimeException(e);  
        }  
    }  
  
    public Fields getOutputFields() {  
        return new Fields(STRING_SCHEME_KEY);  
    }  
}

其实就是直接返回了一个String,在Spout往后发射时就一个字段,其名为“str”,如果采用StringScheme时,大家在Bolt中可以用

    tuple.getStringByField("str")  

这些Scheme主要负责从消息流中解析出所需要的数据。

假设我们现在要把kafka某一个主题的生产数据要获取到作为storm流分析的数据源,我们的生产数据如下

16 91 16777216 0 17 6 7 15 41 46 535 6.158485 1.813451 0.000000 -1068 8496 13572 28 276 0 1597 100 0 1496821304 436028
16 91 16777216 0 17 6 7 15 41 46 785 5.917774 1.716683 0.000000 -1064 8392 13544 109 254 -19 1598 100 0 1496821304 686031
16 91 16777216 0 17 6 7 15 41 47 35 5.090148 1.145671 0.000000 0 0 0 0 0 0 0 0 0 1496821304 935960
16 91 16777216 0 17 6 7 15 41 47 285 6.013670 1.660154 0.000000 -1096 8432 13700 72 314 12 1599 100 0 1496821305 186008
16 91 16777216 0 17 6 7 15 41 47 535 5.637641 1.650586 0.000000 -1080 8468 13468 23 373 -10 1600 100 0 1496821305 436129

以上数据时LZ工作中的部分业务数据,每一行代表一条标签终端数据,以空格分割有25列,每一列代表不同的含义(包括协议类型、球场id、标签id、护腿板id、时间戳、定位精度、加速度、陀螺仪等等),已经提前录入到一个TXT文件中,现在用kafka开启一个生产者模拟数据源实时发送数据(代码查看上一篇),以下主要写storm相关处理逻辑

运行环境

环境还是上一篇storm案例的开发环境
//www.greatytc.com/p/0b70133ee040

jdk1.7
zookeeper-3.4.5
storm-0.9.2
kafka-2.9
redis-3.2.3

确保kafka数据已发送后再编写storm

2017-09-26_111249.png

storm相关代码

(1):空格分割编写bolt切分

KafkaSpoutMain.java

/**
 * @author lvfang
 * @create 2017-06-09 13:57
 * @desc kafka整合storm 主程序入口
 **/
public class KafkaSpoutMain {
    // 主题与zk端口(local)
    public static final String TOPIC = "htb_position_test";
    public static final String ZKINFO = "192.168.90.240:2181";

    private static final String HDTAS_SPOUT = "hdtasSpout";
    private static final String HDTAS_DATA_BOLT = "dataBolt";
    private static final String HDTAS_SPEED_BOLT = "hdtas_speed_bolt";
    private static final String HDTAS_AGILE_BOLT = "hdtas_agile_bolt";
    private static final String HDTAS_BATTERY_BOLT = "hdtas_battery_bolt";
    private static final String HDTAS_DISTANCE_BOLT = "hdtas_distance_bolt";
    private static final String HDTAS_HEARTRATE_BOLT = "hdtas_heartrate_bolt";
    private static final String HDTAS_POSITION_BOLT = "hdtas_psoition_bolt";

    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //创建zk主机
        ZkHosts zkHosts = new ZkHosts(ZKINFO);
        //创建spout
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, TOPIC, "","KafkaSpout");
        //整合kafkaSpout
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        //设置storm数据源为kafka整合storm的kafkaSpout
        topologyBuilder.setSpout(HDTAS_SPOUT, kafkaSpout, 1);

        //流向dataBolt进行空格分割处理(总处理,同时分发给多个bolt)
//      topologyBuilder.setBolt(HDTAS_DATA_BOLT, new DataBolt(), 1).shuffleGrouping(HDTAS_SPOUT);

        Config config = new Config();
        config.setNumWorkers(1);

        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], config,topologyBuilder.createTopology());
            } catch (Exception e) {}
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("HDTAS", config,topologyBuilder.createTopology());
        }
    }
}

KafkaSpoutMain 主要写了各个bolt的运行逻辑,我们可以发现这里只有一个bolt就是我们自定义的DataBolt,由于我们对数据不做业务处理,只做切分处理,所以一个bolt就能看到切分效果就够了,以下是DataBolt的实现

DataBolt.java

/**
 * @author lvfang
 * @create 2017-06-09 13:57
 * @desc 总Bolt,对数据进行分割处理
 **/
public class DataBolt extends BaseRichBolt {

    private OutputCollector collector;

    public Map<String,String> map;

    /**
     * 业务操作,数据处理(这里进行分割发送)
     * @param tuple
     */
    @Override
    public void execute(Tuple tuple) {
        String string = new String((byte[]) tuple.getValue(0));

        String[] datas = string.split(" ");
        System.out.println("接收到消息:"+string);
        if(datas.length==25){
            this.collector.emit(new Values(datas[0],datas[1],datas[2],datas[3],datas[4],datas[5],datas[6],datas[7],datas[8],datas[9],
                    datas[10],datas[11],datas[12],datas[13],datas[14],datas[15],datas[16],datas[17],datas[18],datas[9],
                    datas[20],datas[21],datas[22],datas[23],datas[24]));
        }
    }

    /**
     * 初始化方法
     * @param map
     * @param topologyContext
     * @param outputCollector
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    /**
     * 指定流向,标注流向字段
     * @param declarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(
                new Fields(HdtasInfo.PROTOCOL_TYPE,
                        HdtasInfo.FIELD_ID,
                        HdtasInfo.UWB_ID,
                        HdtasInfo.SIGN_ID,
                        HdtasInfo.YEAR,
                        HdtasInfo.MONTH,
                        HdtasInfo.DAY,
                        HdtasInfo.HOUR,
                        HdtasInfo.MINUTE,
                        HdtasInfo.SECOND,
                        HdtasInfo.MILLISECOND,
                        HdtasInfo.X,
                        HdtasInfo.Y,
                        HdtasInfo.Z,
                        HdtasInfo.A_SPEED_X,
                        HdtasInfo.A_SPEED_Y,
                        HdtasInfo.A_SPEED_Z,
                        HdtasInfo.GYROSCOPE_X,
                        HdtasInfo.GYROSCOPE_Y,
                        HdtasInfo.GYROSCOPE_Z,
                        HdtasInfo.HEART_RATE,
                        HdtasInfo.ELECTRIC,
                        HdtasInfo.CHARGING_STATUS,
                        HdtasInfo.SERVER_ACCEPT_TIME_S,
                        HdtasInfo.SERVER_ACCEPT_TIME_N));
    }
}

我们看到以上DataBolt的实现是用string接收到数据后split切分,然后在通过 public void declareOutputFields(OutputFieldsDeclarer declarer)方法定义好各个列名后下发给下一个bolt,这样的方式完全可以实现我们的业务要求,也很好理解。不过唯一的缺点就是在接收数据时要先自己自定义一个总的数据处理bolt,去切分数据,在从总bolt中下发给其他bolt,这样不但在开发效率上有所降低,而且当数据量复杂时还容易出错。

storm提供了一种简单的方法,spout的scheme配置,在数据从kafka接收后先进行逻辑处理,然后在对各bolt进行分发,以下为相关代码

(2):scheme数据处理

HdtasScheme.java

/**
 * @author lvfang
 * @create 2017-09-25 15:56
 * @desc
 **/
public class HdtasScheme implements Scheme {
    private static final Charset UTF8_CHARSET;

    //协议类型
    public static final String PROTOCOL_TYPE = "protocol_type";
    //场地ID
    public static final String FIELD_ID = "field_id";
    //主设备ID(终端id)
    public static final String UWB_ID = "uwb_id";
    //护腿板ID
    public static final String SIGN_ID = "sign_id";
    // 年 月 日   时 分 秒  毫秒
    public static final String YEAR = "year";
    public static final String MONTH = "month";
    public static final String DAY = "day";
    public static final String HOUR = "hour";
    public static final String MINUTE = "minute";
    public static final String SECOND = "second";
    public static final String MILLISECOND = "millisecond";
    //定位精度  X Y Z
    public static final String X = "x";
    public static final String Y = "y";
    public static final String Z = "z";
    //加速度 X Y Z
    public static final String A_SPEED_X = "a_speed_x";
    public static final String A_SPEED_Y = "a_speed_y";
    public static final String A_SPEED_Z = "a_speed_z";
    //陀螺仪 X Y Z
    public static final String GYROSCOPE_X = "gyroscope_x";
    public static final String GYROSCOPE_Y = "gyroscope_y";
    public static final String GYROSCOPE_Z = "gyroscope_z";
    //心率
    public static final String HEART_RATE = "heart_rate";
    //电池电量
    public static final String ELECTRIC = "electric";
    //电池充电状态  1:充电  0:放电
    public static final String CHARGING_STATUS = "charging_status";
    //Unix时间戳  秒
    public static final String SERVER_ACCEPT_TIME_S = "server_accept_time_s";
    //Unix时间戳  纳秒
    public static final String SERVER_ACCEPT_TIME_N = "server_accept_time_n";


    @Override
    public List<Object> deserialize(ByteBuffer ser) {
        //流转string
        String input = deserializeString(ser);
        //空格切分
        String[] strs = input.split(" ");
        if (strs.length != 25) {//不符合数据格式,错误数据,则不发送
            return null;
        }
        return new Values(strs);//返回数据
    }

    //序列化string(由于接收的数据是ByteBuffer类型,固要序列化)
    private static String deserializeString(ByteBuffer string) {
        if (string.hasArray()) {
            int base = string.arrayOffset();
            return new String(string.array(), base + string.position(), string.remaining());
        } else {
            return new String(Utils.toByteArray(string), UTF8_CHARSET);
        }
    }

    @Override
    public Fields getOutputFields() {
        //指定列
        return new Fields(PROTOCOL_TYPE, FIELD_ID, UWB_ID, SIGN_ID, YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, MILLISECOND, X, Y, Z, A_SPEED_X, A_SPEED_Y, A_SPEED_Z, GYROSCOPE_X, GYROSCOPE_Y, GYROSCOPE_Z, HEART_RATE, ELECTRIC, CHARGING_STATUS, SERVER_ACCEPT_TIME_S, SERVER_ACCEPT_TIME_N);
    }

    static {
        UTF8_CHARSET = Charset.forName("UTF-8");
    }
}

毫无疑问,我们应该先自定义一个scheme,这个scheme就类似于对数据的划分标准以及逻辑,由程序员自己编写,很明显自定义scheme需要实现storm的scheme接口,并重写相关方法,这里我们可以看到重写了public List<Object> deserialize(ByteBuffer ser)方法和 public Fields getOutputFields(),很明显deserialize()方法是进行数据逻辑处理的,我们这里是对数据进行空格分割处理并返回一个Value对象, getOutputFields()方法很明显是定义流向bolt数据各个字段的列名,根据列名称可以获取对应列数据,先来看看scheme的源码

scheme.java

//实现Serializable 接口实现数据的可序列化
public interface Scheme extends Serializable {

    //处理数据的逻辑方法
    List<Object> deserialize(ByteBuffer var1);

    //处理后对数据进行列定义(保证其他bolt就可以从对应列拿到数据)
    Fields getOutputFields();
}

根据scheme的接口方法,我们可以看出scheme就是做了一件事,就是数据处理,他的处理不同之处是在于他不用编写bolt,在所有的bolt之前,以相关业务的格式提前设置了SpoutConfig,我们看看他的bolt逻辑怎么写

KafkaSpoutMain.java

/**
 * @author lvfang
 * @create 2017-06-09 13:57
 * @desc kafka整合storm 主程序入口
 **/
public class KafkaSpoutMain {
    
    // 主题与zk端口(test)
//  public static final String TOPIC = "htb_position_test";
//  public static final String ZKINFO = "192.168.1.118:2181/kafka";

    // 主题与zk端口(local)
    public static final String TOPIC = "htb_position_test";
    public static final String ZKINFO = "192.168.90.240:2181";

    private static final String HDTAS_SPOUT = "hdtasSpout";
    private static final String HDTAS_DATA_BOLT = "dataBolt";
    private static final String HDTAS_SPEED_BOLT = "hdtas_speed_bolt";
    private static final String HDTAS_AGILE_BOLT = "hdtas_agile_bolt";
    private static final String HDTAS_BATTERY_BOLT = "hdtas_battery_bolt";
    private static final String HDTAS_DISTANCE_BOLT = "hdtas_distance_bolt";
    private static final String HDTAS_HEARTRATE_BOLT = "hdtas_heartrate_bolt";
    private static final String HDTAS_POSITION_BOLT = "hdtas_psoition_bolt";

    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //创建zk主机
        ZkHosts zkHosts = new ZkHosts(ZKINFO);
        //创建spout
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, TOPIC, "","KafkaSpout");
        spoutConfig.scheme = new SchemeAsMultiScheme(new HdtasScheme());//设置自定义scheme
        //整合kafkaSpout
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        //设置storm数据源为kafka整合storm的kafkaSpout
        topologyBuilder.setSpout(HDTAS_SPOUT, kafkaSpout, 1);

        // 用自定义Scheme切分数据,与下边的空格切分一个功能
        topologyBuilder.setBolt(HDTAS_DATA_BOLT, new AllDataBolt(), 1).shuffleGrouping(HDTAS_SPOUT);

        Config config = new Config();
        config.setNumWorkers(1);

        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], config,topologyBuilder.createTopology());
            } catch (Exception e) {}
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("HDTAS", config,topologyBuilder.createTopology());
        }
    }
}

bolt接收数据
AllDataBolt.java

/**
 * @author lvfang
 * @create 2017-06-09 13:57
 * @desc 总Bolt,对数据进行分割处理
 **/
public class AllDataBolt extends BaseRichBolt {

    private OutputCollector collector;

    public Map<String,String> map;

    /**
     * 业务操作,数据处理(这里进行分割发送)
     * @param input
     */
    @Override
    public void execute(Tuple input) {
        String x = input.getStringByField(HdtasScheme.X);
        String y = input.getStringByField(HdtasScheme.Y);
        String z = input.getStringByField(HdtasScheme.Z);
        System.out.println("接收到消息:"+x + "-" + y + "-" + z);

    }

    /**
     * 初始化方法
     * @param map
     * @param topologyContext
     * @param outputCollector
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    /**
     * 指定流向,标注流向字段
     * @param declarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //不下发数据,所以不实现
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容