Spark Streaming基于kafka获取数据

[TOC]
SparkStreaming基于kafka获取数据的方式,主要有俩种,即Receiver和Direct,基于Receiver的方式,是SparkStreaming给我们提供了kafka访问的高层api的封装,而基于Direct的方式,就是直接访问,在SparkSteaming中直接去操作kafka中的数据,不需要前面的高层api的封装。而Direct的方式,可以对kafka进行更好的控制!同时性能也更好。

1.Spark streaming基于kafka以Receiver方式获取数据
实际上做kafka receiver的时候,通过receiver来获取数据,这个时候,kafka receiver是使用的kafka高层次的comsumer api来实现的。receiver会从kafka中获取数据,然后把它存储到我们具体的Executor内存中。然后Spark streaming也就是driver中,会根据这获取到的数据,启动job去处理。

image.png

1)在通过kafka receiver去获取kafka的数据,在正在获取数据的过程中,这台机器有可能崩溃了。如果来不及做备份,数据就会丢失,切换到另外一台机器上,也没有相关数据。这时候,为了数据安全,采用WAL的方式。write ahead log,预写日志的方式会同步的将接收到的kafka数据,写入到分布式文件系统中。但是预写日志的方式消耗时间,所以存储时建议Memory_and_Disc2.如果是写到hdfs,会自动做副本。如果是写到本地,这其实有个风险,就是如果这台机器崩溃了,再想恢复过来,这个是需要时间的。

2)我们的kafka receiver接收数据的时候,通过线程或者多线程的方式,kafka中的topic是以partition的方式存在的。sparkstreaming中的kafka receiver接收kafka中topic中的数据,也是通过线程并发的方式去获取的不同的partition,例如用五条线程同时去读取kafka中的topics中的不同的partition数据,这时你这个读取数据的并发线程数,和RDD实际处理数据的并发线程数是没任何关系的。因为获取数据时都还没产生RDD呢。RDD是Driver端决定产生RDD的。

3)默认情况下,一个Executor中是不是只有一个receiver去接收kafka中的数据。那能不能多找一些Executor去更高的并发度,就是使用更多的机器去接收数据,当然可以,基于kafa的api去创建更多的Dstream就可以了。很多的Dstream接收kafka不同topics中的不同的数据,最后你计算的时候,再把他优联就行了。其实这是非常灵活的,因为可以自由的组合。

【代码实战】

producer端:

public class SparkStreamingDataManuallyProducerForKafka extends Thread
{
   static String[] channelNames = new  String[]{
    "Spark","Scala","Kafka","Flink","Hadoop","Storm",
    "Hive","Impala","HBase","ML"
   };
   static String[] actionNames = new String[]{"View", "Register"};
   private String topic; //发送给Kafka的数据的类别
   private Producer<Integer, String> producerForKafka;
   
   private static String dateToday;
   private static Random random;
   public SparkStreamingDataManuallyProducerForKafka(String topic){
    dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
       this.topic = topic;
       random = new Random();
       Properties conf = new Properties();
       conf.put("metadata.broker.list","node03:9092,node04:9092,node05:9092");
       conf.put("serializer.class",  StringEncoder.class.getName());
       producerForKafka = new Producer<Integer, String>
       (new ProducerConfig(conf)) ;
   }  
   @Override
   public void run() {
    int counter = 0;
    while(true){
        counter++;
        String userLog = userlogs();
        //System.out.println("product:"+userLog+"   ");
        producerForKafka.send(new KeyedMessage<Integer, String>
        (topic, userLog));
        if(0 == counter%500){
            counter = 0;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        }
    }
   public static void main( String[] args )
   {
    new SparkStreamingDataManuallyProducerForKafka("kfk").start();
   }
private static String userlogs() {  
    StringBuffer userLogBuffer = new StringBuffer("");
    int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8};
    long timestamp = new Date().getTime();
        Long userID = 0L;
        long pageID = 0L;
        
        //随机生成的用户ID 
        if(unregisteredUsers[random.nextInt(8)] == 1) {
            userID = null;
        } else {
            userID = (long) random.nextInt((int) 2000);
        }           
        //随机生成的页面ID
        pageID =  random.nextInt((int) 2000);
        
        //随机生成Channel
        String channel = channelNames[random.nextInt(10)];
        
        //随机生成action行为
        String action = actionNames[random.nextInt(2)];
        
        
        userLogBuffer.append(dateToday)
                    .append("\t")
                    .append(timestamp)
                    .append("\t")
                    .append(userID)
                    .append("\t")
                    .append(pageID)
                    .append("\t")
                    .append(channel)
                    .append("\t")
                    .append(action);
//                      .append("\n");              
    return userLogBuffer.toString();    
}
}

consumer端:

public class SparkStreamingOnKafkaReceiver {
public static void main(String[] args) {
    SparkConf conf = new SparkConf()
            .setAppName("SparkStreamingOnKafkaReceiver")
            .setMaster("local[2]")
        .set("spark.streaming.receiver.writeAheadLog.enable","true");    
    JavaStreamingContext jsc = new JavaStreamingContext(conf, 
            Durations.seconds(5));
        jsc.checkpoint("hdfs://node02:8020/checkpoint");
    Map<String, Integer> topicConsumerConcurrency =
            new HashMap<String, Integer>();
    topicConsumerConcurrency.put("kfk", 1);
    
    JavaPairReceiverInputDStream<String,String> lines = 
            KafkaUtils.createStream(jsc,
            "node02:2181,node03:2181,node04:2181", 
            "MyFiestConsumerGroup", topicConsumerConcurrency);
    
    
    JavaDStream<String> words = lines.flatMap(new
            FlatMapFunction<Tuple2<String,String>, String>() { 
        //如果是Scala,由于SAM转换,所以可以写成val words = 
        //lines.flatMap { line => line.split(" ")}
        private static final long serialVersionUID = 1L;
        @Override
        public Iterable<String> call(Tuple2<String,String> tuple) 
                throws Exception {
            return Arrays.asList(tuple._2.split("\t"));
        }
    });
  
    JavaPairDStream<String, Integer> pairs = words.mapToPair
            (new PairFunction<String, String, Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public Tuple2<String, Integer> call(String word) throws Exception {
            return new Tuple2<String, Integer>(word, 1);
        }
    });
 
    JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey
            (new Function2<Integer, Integer, Integer>() { 
    //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
        private static final long serialVersionUID = 1L;
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            return v1 + v2;
        }
    });
    wordsCount.print();     
    jsc.start();
    jsc.awaitTermination();
    jsc.close();
}
}

【结果展示】

-------------------------------------------
Time: 1488815425000 ms
-------------------------------------------
(273,1)
(1148,4)
(1119,2)
(1816,1)
(312,1)
(62,1)
(1184,1)
(1625,1)
(1566,1)
(1488815421523,1)
...

2.sparkStreaming基于kafka的Direct详解

Direct方式特点:

1)Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。

2)由于直接操作的是kafka,kafka就相当于你底层的文件系统。这个时候能保证严格的事务一致性,即一定会被处理,而且只会被处理一次。而Receiver的方式则不能保证,因为Receiver和ZK中的数据可能不同步,Spark Streaming可能会重复消费数据,这个调优可以解决,但显然没有Direct方便。而Direct api直接是操作kafka的,

spark streaming自己负责追踪消费这个数据的偏移量或者offset,并且自己保存到checkpoint,所以它的数据一定是同步的,一定不会被重复。即使重启也不会重复,因为checkpoint了,但是程序升级的时候,不能读取原先的checkpoint,面对升级checkpoint无效这个问题,怎么解决呢?升级的时候读取我指定的备份就可以了,即手动的指定checkpoint也是可以的,这就再次完美的确保了事务性,有且仅有一次的事务机制。那么怎么手动checkpoint呢?构建SparkStreaming的时候,有getorCreate这个api,它就会获取checkpoint的内容,具体指定下这个checkpoint在哪就好了。或者如下图:

image.png

而如果从checkpoint恢复后,如果数据累积太多处理不过来,怎么办?1)限速2)增强机器的处理能力3)放到数据缓冲池中。

3)由于底层是直接读数据,没有所谓的Receiver,直接是周期性(Batch Intervel)的查询kafka,处理数据的时候,我们会使用基于kafka原生的Consumer api来获取kafka中特定范围(offset范围)中的数据。这个时候,Direct Api访问kafka带来的一个显而易见的性能上的好处就是,如果你要读取多个partition,Spark也会创建RDD的partition,这个时候RDD的partition和kafka的partition是一致的。而Receiver的方式,这2个partition是没任何关系的。这个优势是你的RDD,其实本质上讲在底层读取kafka的时候,kafka的partition就相当于原先hdfs上的一个block。这就符合了数据本地性。RDD和kafka数据都在这边。所以读数据的地方,处理数据的地方和驱动数据处理的程序都在同样的机器上,这样就可以极大的提高性能。不足之处是由于RDD和kafka的patition是一对一的,想提高并行度就会比较麻烦。提高并行度还是repartition,即重新分区,因为产生shuffle,很耗时。这个问题,以后也许新版本可以自由配置比例,不是一对一。因为提高并行度,可以更好的利用集群的计算资源,这是很有意义的。

4)不需要开启wal机制,从数据零丢失的角度来看,极大的提升了效率,还至少能节省一倍的磁盘空间。从kafka获取数据,比从hdfs获取数据,因为zero copy的方式,速度肯定更快。

【代码实战】

public class SparkStreamingOnKafkaDirected {
    public static void main(String[] args) {    
        SparkConf conf = new SparkConf().
                setAppName("SparkStreamingOnKafkaDirected")
                .setMaster("local");    
        JavaStreamingContext jsc = new JavaStreamingContext
                (conf, Durations.seconds(10));  
        Map<String, String> kafkaParameters = new HashMap
                <String, String>();
        kafkaParameters.put("metadata.broker.list", 
                "node03:9092,node04:9092,node05:9092"); 
        HashSet<String> topics = new HashSet<String>();
        topics.add("kfk");
        JavaPairInputDStream<String,String> lines = 
                KafkaUtils.createDirectStream(jsc,
                String.class, 
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParameters,
                topics);    
        JavaDStream<String> words = lines.flatMap(
                new FlatMapFunction<Tuple2<String,String>, String>() { 
                    //如果是Scala,由于SAM转换,所以可以写成val words = 
                    //lines.flatMap { line => line.split(" ")}
            @Override
            public Iterable<String> call(Tuple2<String,String> tuple) 
                    throws Exception {
                return Arrays.asList(tuple._2.split("\t"));
            }
        });     
        JavaPairDStream<String, Integer> pairs = words.mapToPair(
                new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) 
                    throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
            
        JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() { 
    //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
            
            @Override
            public Integer call(Integer v1, Integer v2) 
                    throws Exception {
                return v1 + v2;
            }
        }); 
        wordsCount.print(); 
        jsc.start();    
        jsc.awaitTermination();
        jsc.close();
    }
}

并行度问题:

并行度和RDD中的partition个数有关系。可以通过repartition增加。

1、receiver模式:并行度与block Interval(默认200ms)有关系,但是建议不低于50ms。

2、direct模式:并行度与kafak中topic的partition数据有关。增加kafka中的topic的partition数量可以提高并行度。

receive模式和direct模式最大不同是消费偏移量管理者不同,一个是zookeeper,另一个是SparkStreaming(checkpoint)

direct模式和receive模式对比:

1.简化并行性:无需创建多个输入Kafka流并且结合它们。 使用directStream,Spark Streaming将创建与要消费的Kafkatopic中partition分区一样多的RDD分区,这将从Kafka并行读取数据。 因此,在Kafka和RDD分区之间存在一对一映射,这更容易理解和调整。

2.效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,该日志进一步复制数据。 这实际上是低效的,因为数据有效地被复制两次 - 一次是Kafka,另一次是写入提前日志。 第二种方法消除了问题,因为没有接收器(zookeeper),因此不需要预写日志。 将元数据信息直接保存在kafka中,可以从Kafka恢复消息。

3.Exactly-once语义:第一种方法使用Kafka的高级API在Zookeeper中存储消耗的偏移量。这是传统上消费Kafka数据的方式。虽然这种方法(与预写日志结合)可以确保零数据丢失(即至少一次语义),但是一些记录在一些故障下可能被消耗两次。这是因为Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移之间存在不一致。因此,在第二种方法中,我们使用简单的Kafka API,不使用Zookeeper的。偏移由Spark Streaming在其检查点内跟踪。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,所以每个记录被Spark Streaming有效地精确接收一次,尽管失败了。为了实现输出结果的一次性语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355