kafka基础

1. kafka架构图

kafka架构图

2. 角色分析

1. Broker

kafka作为一个消息中间件,用于存储和转发消息,可以把它想象成一个中介,股票经纪人就叫做broker。默认端口是9092,生产者和消费者都需要跟这个Broker建立连接才可以实现消息的收发。

2. 消息

客户端之间传输的数据称之为消息, 或者说是记录(record)。请记住,对于kafka来说,不管是消费者还是生产者都是客户端。 在客户端的代码中,Record可以是一个key-value键值对,生产者对应的封装类是ProducerRecord, 消费者对应的封装类是ConsumerRecord。消息在传输的过程中需要序列化,所有需要我们在代码中执行序列化工具。消息在服务端中存储的格式(RecordBatch和Record)。

3. 生产者

我们将发送消息的一方称之为生产者,接收消息的乙方称之为消费者,为了提升消息发送的速率,生产者并不是组条发送消息到broker中,而是批量发送的。多少条发送一次,由配置中的一个参数决定。

props.put("batch.size", 16384);

4. 消费者

一般来说,消费者获取消息存在两种方式,一种是pull, 一种是push。kafka采用的是pull模式。WHY?
\color{red}{在push模式下,如果消息产生的速度远大于消费者消费的速度,消费者会不堪重负,最终挂掉。}
消费者可以控制自己一次消费多少条消息

max.poll.record=500    #默认是500条

5. \color{red}{TOPIC}

生产者和消费者之间每条消息之间是如何关联起来的呢?也就是消费者怎么就知道自己需要消费什么消息?
队列的存在就是解决这个问题的。在kafka里面这个队列就是topic,\color{red}{它是一个逻辑概念}
生产者和Topic,Topic和消费者的关系都是多对多(不建议这么做)。
当生产者发送消息时,没有对应的Topic,这个时候会自动创建Topic。可以通过参数控制

auto.enable.topics.enable=true   #默认时true

6. partition和Cluster

分区其实是一种数据库分片的思想。试想一下,如果一个topic中消息过多,会产生什么样的问题。

  • 不方便横向扩展,通过扩展机器而不是升级硬件扩展。
  • 并发负载,所有的客户端都操作同一个topic,在高并发的场景下,性能瓶颈
    kafka分区概念---partition。一个topic可以划分成多个分区,分区在创建topic的时候指定,每个topic至少有一个分区。
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xiong

如果没有指定分区数,默认分区是1个,可通过下述参数修改

num.partitions=1

partition负载的实现。举例说明,Topic有三个分区,生产者发送了9条消息,第一个分区存储了1 4 7, 第二个分区存储了2 5 8,第三个分区存储了3 6 9。这种情况下其实就是负载的一种体现

每个partition都会有一个物理目录。kafka的配置文件下可以配置日志的存储路径,默认存储在/tmp/kafka-logs下,假设topic=xiongTopic, 每个分区的存储目录就是xiongTopic-0、xiongTopic-1.....

\color{red}{与rabbitMq不同的是,Partition中的消息被读取以后不会被删除,kafka是通过一个类似游标的东} \color{red}{西用来记录当前消息读取的位置偏移量信息。同时,同一批消息在一个partition里面是顺序追加写入的。} \color{red}{这里也是kafka吞吐量大的一个重要原因.}

7. 副本机制

如果partition的数据只存储了一份,在发生网络或者硬件故障的时候,该分区的数据会无法访问或者无法恢复了。kafka在0.8版本之后增加了副本机制, 每个partiotion可以有若干个副本,\color{red}{副本必须在不同的broker上}。一般我们说的副本包括其中的主节点。
由replication-factor指定一个Topic的副本数:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partition --topic testxiong

服务端有个参数控制默认的副本数

offsets.topic.replication.factor=3
分区、副本

leader用粉红色标识,follower用绿色标识,leader是由选举得出。
生产者消费者消息传递都是通过leader来操作,follower的数据是通过leader同步过来的。

8. Segment

kafka的数据是放在后缀为.log的文件中,试想一下,kafka的数据在同一个partition中是顺序写入的,我们不断的追加数据,那保存数据的文件就会越来越大,这个时候检索的效率就会越来越低。
所以,kafka这块干脆对partition再次进行了切分,切分出来的单位就就做段(segment),实际上kafka数据的存储是分段的。我们可以在kafka的存储目录下看到这三个文件都是成对出现的:


segment

这其中是一个数据文件,2个索引文件。segment的默认存储大小是1G,可以通过一下参数进行控制。

log.segment.bytes=1073741824

9. Consumer Group

在kafka中,消费者是以消费者组的形式对消息进行接收。每个消费者组都会由一个group id与对应的topic进行绑定。
\color{blue}{注意: 同一个group中的消费者不能消费相同的partition,可以将partition比作一个座位,一个座位最多坐一个人。}

  • 消费者组中,消费者数量比partition数量少的情况下,一个消费者同时消费多个partition。
  • 消费者组中,消费者数量比partition数量多的情况下,存在消费者空闲。

这两种情况都不是效率最高的情况,只有消费者数量和partition数量保持一致才是最好的选择。如果想要消费同一个partition,就需要另一个消费者组来进行。

10. Comsumer Offset

我们前面谈到,在Kafka中消息是顺序写入的,并且消费的消息是不会被删除的。那么,如果消费者突然挂掉,或者进行下次读写时,如何知道自己已经读取了哪些信息,该从何处继续读取消息呢?
既然消息是有序的,那我们就可以给消息进行编号,来唯一标识一条消息。

offset

这里的编号我们就称之为offset,偏移量。offset记录着下一条将要发送给consumer的消息序号。offset的保存是保存在服务端的,并不是保存在ZK上面。

3. Kafka Java开发

生产者:

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.128:9092");
        // 设置key  value序列化的工具
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //设置消息接收确认模式  0 发出就立刻确认, 1 leader接收到就确认  all 所有follower同步完成再确认
        props.put("acks","1");
        // 异常重试次数
        props.put("retries", 3);
        // 设置批量发送数据一次,数据大小,默认16k
        props.put("batch.size",16384);
        // 设置批量发送等待时间
        props.put("linger.ms", 5);
        // 设置客户端缓冲区大小,默认是32M,满了以后也会出发消息发送
        props.put("buffer.memory", 33554432);
        // 获取元数据时生产者的阻塞时间,超时后抛出异常
        props.put("max.block.ms", 3000);

        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        for (int i=0; i < 100; i ++) {
            producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}

消费者

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.128:9092");
        props.put("group.id", "xiong-group");
        // 是否自动提交偏移量,只有commit之后才更新消费者组的offset
        props.put("enable.auto.commit", "true");
        // 消费者自动提交的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        // 从最早的数据开始消费earliest | latest | none
        props.put("auto.offset.reset", "earliest");
        // 设置key  value反序列化的工具
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //订阅队列
        consumer.subscribe(Arrays.asList("mytopic"));
        try{
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset=%d, key=%s, value=%s, partition=%s%n",
                            record.offset(), record.key(), record.value(), record.partition());
                }
            }

        } finally {
            consumer.close();
        }
    }
}

查询消费者相关偏移量数据:

./kafka-consumer-groups.sh --bootstrap-server 192.168.182.128:9092 --describe --group xiong-group

4. 消息幂等性

什么叫做消息幂等性?
简单来说就是,消息发送一次的结果和发送多次的结果是一样的。
有时候消息消费失败的情况下,我们可能会采用消息重发的机制。但是生产者有时候是不知道消息是不是真的消费失败时,这时候消息的重发可能会产生消息重复的情况。
kafka实现消息的幂等性是在broker中实现的,而不是消费者端实现,大大的解放了消费者的双手。
如何实现消息的去重?
去重是需要依赖生产者消息的唯一标识的,不然我们没法知道是否是同一条消息,kafka中可以通过如下配置来产生唯一标识,将producer升级成幂等性的producer。

props.put("enable.idempotence", true);

实现机制:

  • PID(Producer ID), 幂等性的生产者每个客户端都有一个唯一的编号。
  • sequence number,幂等性的生产者发送的每条消息都会带sequence number, Server端就是通过这个值来判断消息是否重复。如果server端发现sequence number的值比服务端记录的值要小,那证明这个消息是重复的消息。(同一分区消息顺序写入,之前如果存在sequence number较小的在后面写入,那证明之前肯定已经有相同的消息已经发送过来过了)。

作用范围:

  1. sequence number并不是全局有序,不能保证所有时间上的幂等。只能保证单分区上的幂等。
  2. 单会话上的幂等,这里的会话是指producer进程的一次运行。当producer重启以后就不能保证了。

5. 生产者事务

生产者与事务有关的方法如下:(kafka 0.11版本以后才支持事务)

对象 描述
initTransactions() 初始化事务
beginTransaction() 开启事务
commitTransaction() 提交事务
abortTransaction() 中止事务
sendOffsetsToTransaction() sendOffsetsToTransaction方法是消费者和生产者在同一段代码使用的(从上游接收消息发送给下游),在提交的时候把消费消息的offset发送给consumer Corordinator.

代码示例:

        //事务的前提是消费者的幂等性
        props.put("enable.idempotence", true);
        //设置事务id,唯一
        props.put("transactional.id", UUID.randomUUID().toString());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        producer.initTransactions();
        try{
            producer.beginTransaction();
            for (int i=0; i < 100; i ++) {
                producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
                if (i == 20) {
                    Integer j = 1/0; //制造异常
                }
            }
            producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(100), Integer.toString(100)));
            producer.commitTransaction();
        } catch (KafkaException e) {
            producer.abortTransaction();
        }
        producer.close();

kafka分布式事务的实现:

  1. 生产者的消息会分区,所以这里的事务属于分布式事务。kafka采用的是2PC提交。如果大家都可以commit就提交,否则就abort;
  2. 2PC的情况下,需要一个协调者,在Kafka中这个角色叫做Transaction Coordinator。
  3. 事务管理必须有事务日志来记录事务的状态,以便在Coordinator以外挂掉以后继续处理原来的事务。事务日志的存储类似于消费者offset的存储,kafka使用了一个特殊topic--transaction_state来记录事务的状态信息。
  4. 如果生产者挂了,事务要在重启以后继续处理就需要有一个唯一的事务id来找到对应的事务,这个就是transaction.id。配置了transaction.id,此时生产者必须是幂等性的生产者。事务id相同的生产者可以继续处理原来的事务。
事务处理

步骤描述:
A: 生产者通过initTransactions Api向coordinator注册事务id。
B: Corrdinator记录事务日志
C: 生产者将消息写入目标分区
D: 分区域Coordinator的交互,当事务完成以后消息的状态应该是已提交。这时候消费者才能消费

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容