绍圣--kafka之生产者(五)

在看很多讲kafka的文章里面都会说:kafka只保证单个partition的有序性,那么kafka是怎么保证有序的喃?

使用RecordAccumulator的mutePartition和unmutePartition方法来配合实现有序性

//记录tp是否还有未完成的RecordBatch,保证一个tp的顺序性,当一个tp对应的RecordBatch要开始发送时,就将此tp加入到muted中,tp对应的RecordBatch发送完成后,删除muted中的tp

private final Set muted;

public void mutePartition(TopicPartition tp) { muted.add(tp); }

public void unmutePartition(TopicPartition tp) { muted.remove(tp); }

RecordAccumulator.ready方法中进行判断(伪代码)

public ReadyCheckResult ready(Cluster cluster, long nowMs) {

if (!readyNodes.contains(leader) && !muted.contains(part)) {}

}

if (!readyNodes.contains(leader) && !muted.contains(part)),如果muted中包含了这个tp,那么即使这个tp对应的leader存在,RecordBatch可以发送也不会去发送它,因为它上一个RecordBatch还没有处理完成。

RecordAccumulator.drain方法中进行判断(伪代码)

public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) {

if (!muted.contains(tp)){}

}

if (!muted.contains(tp))在对RecordAccumulator中的记录进行重新组装的时候,依旧会判断对应的tp是否在muted中。在muted中的依旧不会选择出来发送。

在Sender中的变量:guaranteeMessageOrder:是否保持单个partition的有序性

在KafkaProducer的构造中

this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);

public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder, int maxRequestSize, short acks, int retries, Metrics metrics, Time time, String clientId, int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.guaranteeMessageOrder = guaranteeMessageOrder; this.maxRequestSize = maxRequestSize; this.running = true; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; }

guaranteeMessageOrder=config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1

我们可以在使用的时候设置max.in.flight.requests.per.connection来设置guaranteeMessageOrder的值。

mutePartition和unmutePartition方法都是在Sender中进行调用

mutePartition在Sender.run中调用

if (guaranteeMessageOrder) {

// 记录将要发送的topicPartition到mute中

for (List batchList : batches.values()) {

for (RecordBatch batch : batchList)

this.accumulator.mutePartition(batch.topicPartition);

}

}

发送的时候,把将要提交的RecordBatch的tp加到muted中。下次再需要发送tp里的RecordBatch的时候,如果muted里面包含了此tp,就不会选择出来发送。

在处理服务端响应的时候,清除muted中的tp

if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition);

总结:要保证单partition的有序性,需要配置max.in.flight.requests.per.connection=1。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,026评论 19 139
  • 学习kafka有一段时间了。关于它里面的知识还是需要总结一下,一来是能让自己对kafka能有一个比较成型的理解,二...
    绍圣阅读 1,125评论 0 3
  • kafka版本为0.10.1.0 大体流程 1:初始化,读取配置,配置metrics等 2:创建 RecordAc...
    xcardata阅读 658评论 0 0
  • 话说上回中,KafkaProducer已经将生产的记录追加到了RecordAccumulator中。那么接下来的事...
    绍圣阅读 928评论 2 1
  • 前面三回在分析生产者时,重点在发送的主流程上:怎么生产,怎么发送,怎么调度。略过了一个重要的环节:Metadata...
    绍圣阅读 742评论 0 0