[TOC]
SparkStreaming基于kafka获取数据的方式,主要有俩种,即Receiver和Direct,基于Receiver的方式,是SparkStreaming给我们提供了kafka访问的高层api的封装,而基于Direct的方式,就是直接访问,在SparkSteaming中直接去操作kafka中的数据,不需要前面的高层api的封装。而Direct的方式,可以对kafka进行更好的控制!同时性能也更好。
1.Spark streaming基于kafka以Receiver方式获取数据
实际上做kafka receiver的时候,通过receiver来获取数据,这个时候,kafka receiver是使用的kafka高层次的comsumer api来实现的。receiver会从kafka中获取数据,然后把它存储到我们具体的Executor内存中。然后Spark streaming也就是driver中,会根据这获取到的数据,启动job去处理。
1)在通过kafka receiver去获取kafka的数据,在正在获取数据的过程中,这台机器有可能崩溃了。如果来不及做备份,数据就会丢失,切换到另外一台机器上,也没有相关数据。这时候,为了数据安全,采用WAL的方式。write ahead log,预写日志的方式会同步的将接收到的kafka数据,写入到分布式文件系统中。但是预写日志的方式消耗时间,所以存储时建议Memory_and_Disc2.如果是写到hdfs,会自动做副本。如果是写到本地,这其实有个风险,就是如果这台机器崩溃了,再想恢复过来,这个是需要时间的。
2)我们的kafka receiver接收数据的时候,通过线程或者多线程的方式,kafka中的topic是以partition的方式存在的。sparkstreaming中的kafka receiver接收kafka中topic中的数据,也是通过线程并发的方式去获取的不同的partition,例如用五条线程同时去读取kafka中的topics中的不同的partition数据,这时你这个读取数据的并发线程数,和RDD实际处理数据的并发线程数是没任何关系的。因为获取数据时都还没产生RDD呢。RDD是Driver端决定产生RDD的。
3)默认情况下,一个Executor中是不是只有一个receiver去接收kafka中的数据。那能不能多找一些Executor去更高的并发度,就是使用更多的机器去接收数据,当然可以,基于kafa的api去创建更多的Dstream就可以了。很多的Dstream接收kafka不同topics中的不同的数据,最后你计算的时候,再把他优联就行了。其实这是非常灵活的,因为可以自由的组合。
【代码实战】
producer端:
public class SparkStreamingDataManuallyProducerForKafka extends Thread
{
static String[] channelNames = new String[]{
"Spark","Scala","Kafka","Flink","Hadoop","Storm",
"Hive","Impala","HBase","ML"
};
static String[] actionNames = new String[]{"View", "Register"};
private String topic; //发送给Kafka的数据的类别
private Producer<Integer, String> producerForKafka;
private static String dateToday;
private static Random random;
public SparkStreamingDataManuallyProducerForKafka(String topic){
dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
this.topic = topic;
random = new Random();
Properties conf = new Properties();
conf.put("metadata.broker.list","node03:9092,node04:9092,node05:9092");
conf.put("serializer.class", StringEncoder.class.getName());
producerForKafka = new Producer<Integer, String>
(new ProducerConfig(conf)) ;
}
@Override
public void run() {
int counter = 0;
while(true){
counter++;
String userLog = userlogs();
//System.out.println("product:"+userLog+" ");
producerForKafka.send(new KeyedMessage<Integer, String>
(topic, userLog));
if(0 == counter%500){
counter = 0;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main( String[] args )
{
new SparkStreamingDataManuallyProducerForKafka("kfk").start();
}
private static String userlogs() {
StringBuffer userLogBuffer = new StringBuffer("");
int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8};
long timestamp = new Date().getTime();
Long userID = 0L;
long pageID = 0L;
//随机生成的用户ID
if(unregisteredUsers[random.nextInt(8)] == 1) {
userID = null;
} else {
userID = (long) random.nextInt((int) 2000);
}
//随机生成的页面ID
pageID = random.nextInt((int) 2000);
//随机生成Channel
String channel = channelNames[random.nextInt(10)];
//随机生成action行为
String action = actionNames[random.nextInt(2)];
userLogBuffer.append(dateToday)
.append("\t")
.append(timestamp)
.append("\t")
.append(userID)
.append("\t")
.append(pageID)
.append("\t")
.append(channel)
.append("\t")
.append(action);
// .append("\n");
return userLogBuffer.toString();
}
}
consumer端:
public class SparkStreamingOnKafkaReceiver {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("SparkStreamingOnKafkaReceiver")
.setMaster("local[2]")
.set("spark.streaming.receiver.writeAheadLog.enable","true");
JavaStreamingContext jsc = new JavaStreamingContext(conf,
Durations.seconds(5));
jsc.checkpoint("hdfs://node02:8020/checkpoint");
Map<String, Integer> topicConsumerConcurrency =
new HashMap<String, Integer>();
topicConsumerConcurrency.put("kfk", 1);
JavaPairReceiverInputDStream<String,String> lines =
KafkaUtils.createStream(jsc,
"node02:2181,node03:2181,node04:2181",
"MyFiestConsumerGroup", topicConsumerConcurrency);
JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<Tuple2<String,String>, String>() {
//如果是Scala,由于SAM转换,所以可以写成val words =
//lines.flatMap { line => line.split(" ")}
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(Tuple2<String,String> tuple)
throws Exception {
return Arrays.asList(tuple._2.split("\t"));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair
(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey
(new Function2<Integer, Integer, Integer>() {
//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
【结果展示】
-------------------------------------------
Time: 1488815425000 ms
-------------------------------------------
(273,1)
(1148,4)
(1119,2)
(1816,1)
(312,1)
(62,1)
(1184,1)
(1625,1)
(1566,1)
(1488815421523,1)
...
2.sparkStreaming基于kafka的Direct详解
Direct方式特点:
1)Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。
2)由于直接操作的是kafka,kafka就相当于你底层的文件系统。这个时候能保证严格的事务一致性,即一定会被处理,而且只会被处理一次。而Receiver的方式则不能保证,因为Receiver和ZK中的数据可能不同步,Spark Streaming可能会重复消费数据,这个调优可以解决,但显然没有Direct方便。而Direct api直接是操作kafka的,
spark streaming自己负责追踪消费这个数据的偏移量或者offset,并且自己保存到checkpoint,所以它的数据一定是同步的,一定不会被重复。即使重启也不会重复,因为checkpoint了,但是程序升级的时候,不能读取原先的checkpoint,面对升级checkpoint无效这个问题,怎么解决呢?升级的时候读取我指定的备份就可以了,即手动的指定checkpoint也是可以的,这就再次完美的确保了事务性,有且仅有一次的事务机制。那么怎么手动checkpoint呢?构建SparkStreaming的时候,有getorCreate这个api,它就会获取checkpoint的内容,具体指定下这个checkpoint在哪就好了。或者如下图:
而如果从checkpoint恢复后,如果数据累积太多处理不过来,怎么办?1)限速2)增强机器的处理能力3)放到数据缓冲池中。
3)由于底层是直接读数据,没有所谓的Receiver,直接是周期性(Batch Intervel)的查询kafka,处理数据的时候,我们会使用基于kafka原生的Consumer api来获取kafka中特定范围(offset范围)中的数据。这个时候,Direct Api访问kafka带来的一个显而易见的性能上的好处就是,如果你要读取多个partition,Spark也会创建RDD的partition,这个时候RDD的partition和kafka的partition是一致的。而Receiver的方式,这2个partition是没任何关系的。这个优势是你的RDD,其实本质上讲在底层读取kafka的时候,kafka的partition就相当于原先hdfs上的一个block。这就符合了数据本地性。RDD和kafka数据都在这边。所以读数据的地方,处理数据的地方和驱动数据处理的程序都在同样的机器上,这样就可以极大的提高性能。不足之处是由于RDD和kafka的patition是一对一的,想提高并行度就会比较麻烦。提高并行度还是repartition,即重新分区,因为产生shuffle,很耗时。这个问题,以后也许新版本可以自由配置比例,不是一对一。因为提高并行度,可以更好的利用集群的计算资源,这是很有意义的。
4)不需要开启wal机制,从数据零丢失的角度来看,极大的提升了效率,还至少能节省一倍的磁盘空间。从kafka获取数据,比从hdfs获取数据,因为zero copy的方式,速度肯定更快。
【代码实战】
public class SparkStreamingOnKafkaDirected {
public static void main(String[] args) {
SparkConf conf = new SparkConf().
setAppName("SparkStreamingOnKafkaDirected")
.setMaster("local");
JavaStreamingContext jsc = new JavaStreamingContext
(conf, Durations.seconds(10));
Map<String, String> kafkaParameters = new HashMap
<String, String>();
kafkaParameters.put("metadata.broker.list",
"node03:9092,node04:9092,node05:9092");
HashSet<String> topics = new HashSet<String>();
topics.add("kfk");
JavaPairInputDStream<String,String> lines =
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParameters,
topics);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<Tuple2<String,String>, String>() {
//如果是Scala,由于SAM转换,所以可以写成val words =
//lines.flatMap { line => line.split(" ")}
@Override
public Iterable<String> call(Tuple2<String,String> tuple)
throws Exception {
return Arrays.asList(tuple._2.split("\t"));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word)
throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
@Override
public Integer call(Integer v1, Integer v2)
throws Exception {
return v1 + v2;
}
});
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
并行度问题:
并行度和RDD中的partition个数有关系。可以通过repartition增加。
1、receiver模式:并行度与block Interval(默认200ms)有关系,但是建议不低于50ms。
2、direct模式:并行度与kafak中topic的partition数据有关。增加kafka中的topic的partition数量可以提高并行度。
receive模式和direct模式最大不同是消费偏移量管理者不同,一个是zookeeper,另一个是SparkStreaming(checkpoint)
direct模式和receive模式对比:
1.简化并行性:无需创建多个输入Kafka流并且结合它们。 使用directStream,Spark Streaming将创建与要消费的Kafkatopic中partition分区一样多的RDD分区,这将从Kafka并行读取数据。 因此,在Kafka和RDD分区之间存在一对一映射,这更容易理解和调整。
2.效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,该日志进一步复制数据。 这实际上是低效的,因为数据有效地被复制两次 - 一次是Kafka,另一次是写入提前日志。 第二种方法消除了问题,因为没有接收器(zookeeper),因此不需要预写日志。 将元数据信息直接保存在kafka中,可以从Kafka恢复消息。
3.Exactly-once语义:第一种方法使用Kafka的高级API在Zookeeper中存储消耗的偏移量。这是传统上消费Kafka数据的方式。虽然这种方法(与预写日志结合)可以确保零数据丢失(即至少一次语义),但是一些记录在一些故障下可能被消耗两次。这是因为Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移之间存在不一致。因此,在第二种方法中,我们使用简单的Kafka API,不使用Zookeeper的。偏移由Spark Streaming在其检查点内跟踪。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,所以每个记录被Spark Streaming有效地精确接收一次,尽管失败了。为了实现输出结果的一次性语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。