绍圣--kafka之消费者(三)

消费者轮询通过拉取器(Fetcher)发送拉取请求,拉取器会调用消费者网络客户端的发送方法(send)和网络轮询方法(poll)。在拉取器的层面拉取请求是没有真正发送到服务端的。发送方法只是把请求存在到变量中,真正发送到服务端是调用了消费者网络客户端对象的网络轮询方法(poll)。

消费者网络客户端:ConsumerNetworkClient,对NetworkClient的一层封装。

消费者网络客户端发送方法

// 发送请求,只是把请求暂时存放在unsent变量中

private RequestFuture<ClientResponse> send(Node node, ApiKeys api, short version, AbstractRequest request) {

long now = time.milliseconds();

RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();

RequestHeader header = client.nextRequestHeader(api, version);

RequestSend send = new RequestSend(node.idString(),

header, request.toStruct());

put(node, new ClientRequest(now, true, send, completionHandler)); // 没有真正发送 client.wakeup();

return completionHandler.future;

}


private void put(Node node, ClientRequest request) {

synchronized (this) {

List<ClientRequest> nodeUnsent = unsent.get(node);

if (nodeUnsent == null) {

nodeUnsent = new ArrayList<>();

unsent.put(node, nodeUnsent);

}

nodeUnsent.add(request);

}

}

保存到变量unsent(Map<Node,List<ClientRequest>>)中的对象是ClientRequest。

ClientRequest相关类图:

请求对象相关的类

消费者网络客户端异步发送请求涉及的相关类说明:

1,异步请求完成处理器(RequestFutureCompletionHandler)

2,异步请求(RequestFuture):客户端调用发送请求的返回值。当异步请求完成时,可以获取异步请求的结果。

3,异步请求监听器(RequestFutureListener):异步请求添加监听器,当异步请求有结果时,调用监听器的onSuccess方法。

以拉取器发送拉取请求为例,看看监听器的使用方式:

1,消费者客户端调用ConsumerNetworkClient发送拉取请求返回异步请求对象(RequestFuture)。同时也创建了异步请求完成处理器(RequestFutureCompletionHandler),RequestFutureCompletionHandler实例持有RequestFuture实例,返回其实也是返回的RequestFutureCompletionHandler持有的RequestFuture实例。

2,在返回的异步请求对象上添加异步请求监听器,监听器会处理拉取的到结果。

3,客户端轮询,在收到拉取请求结果后,调用回调处理器的onComplete方法(RequestFutureCompletionHandler.onComplete(ClientResponse))。

4,触发监听器onSuccess方法(RequestFutureListener.onSuccess(ClientResponse))

调用发送方法(send)返回的是一个异步请求对象:RequestFuture。RequestFuture封装了请求相关的信息,表示异步请求的结果。发送方法中还会创建一个异步请求的完成处理器(RequestFutureCompletionHandler:当请求被服务器处理完成,并返回响应结果给客户端,客户端会根据响应结果执行具体逻辑)

网络客户端轮询到拉取请求结果后的处理流程图:

网络客户端轮询到拉取请求结果后的处理流程

网络客户端轮询到拉取请求结果后的处理时序图:

网络客户端轮询到拉取请求结果后的处理时序图

组合加适配器模式(compose+adapter)

异步请求(RequestFuture)为客户端提供了调用自定义业务处理逻辑的的入口,除了添加监听器的方式(类似拉取请求),还可以使用组合加适配器模式:监听器+适配器(对客户端响应结果做进行解析成客户想要的数据格式)。所以一句话概括组合加适配器:监听器+转换响应结果成客户想要的数据格式。组合表示组装一个监听器,适配器表示对客户端响应结果进行适配。

普通模式

伪代码(发送拉取请求):为异步请求添加监听器,当请求完成时,会调用监听器的回调方法会对响应进行处理。

client.send(fetchTarget, ApiKeys.FETCH, request)

.addListener(new RequestFutureListener() { // 监听器

public void onSuccess(ClientResponse resp) {

}

}

组合加适配器模式

伪代码(发送列举偏移量请求给分区的主节点):适配器中对响应结果进行适配。在组合方法(RequestFuture.compose())中,创建一个新的异步请求对象S,在旧的异步请求对象T(调用compose方法的异步请求对象)添加监听器,并传递新的异步请求对象S到适配器的onSuccess方法中。在适配器的onSuccess方法中进行消息的转换。

client.send(node, ApiKeys.LIST_OFFSETS, request)

.compose(new RequestFutureAdapter>() { // 适配器

public void onSuccess(ClientResponse response, RequestFuture> future) {

future.complete(); // 新建的异步请求的complete方法 触发调用新建的异步请求的监听器的onSuccess方法

}

});

public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {

// 在异步请求T里,新建了一个异步请求S

final RequestFuture<S> adapted = new RequestFuture<>();

addListener(new RequestFutureListener<T>() { // 为T旧的异步请求添加监听器

// 客户端轮询到结果时,会调用监听器的回调方法

public void onSuccess(T value) {

adapter.onSuccess(value, adapted); // 调用适配器的回调方法 进行消息转换

}

public void onFailure(RuntimeException e) {

adapter.onFailure(e, adapted);

}

});

return adapted; // 返回新建的异步请求对象S

}

组合+适配器模式的调用流程(列举偏移量请求)

组合+适配器模式的调用流程(列举偏移量请求)

普通模式和组合加适配器模式的区别:

普通模式使用监听器,并将异步请求的结果穿给监听器的回调方法。组合模式的监听器,对异步请求的结果在适配器中做一次转换。

ConsumerNetworkClient.polll(RequestFuture)和ConsumerNetworkClient.poll(timeout)

ConsumerNetworkClient.polll(RequestFuture):必须等到异步请求完成才会结束,在轮询结束后可以获取异步请求的结果。

ConsumerNetworkClient.poll(timeout):不管异步请求有没有完成,都会在给定的超时时间内返回。这时在轮询完成后获取异步请求的结果不一定有结果。

异步请求的链式模式

将另一个异步请求链接起来。

链式调用和组合模式的区别

1,组合模式和链接模式都会为当前异步请求添加一个监听器。

2,组合模式会创建一个新的异步请求,链接模式则传入一个已有的异步请求。

3,组合模式返回新异步请求,链接模式返回当前异步请求,不是返回传入的异步请求。

// 链路模式 @param future 已有的异步请求

public void chain(final RequestFuture future) {

addListener(new RequestFutureListener() {

public void onSuccess(T value) { // value是当前异步请求的结果

// 用当前异步请求的结果作为传入的异步请求的的结果

future.complete(value); // 调用异步请求的完成方法

}

public void onFailure(RuntimeException e) { future.raise(e);

}

}); // 没有返回值,所以调用方还是使用的当前的异步请求对象,而不是传入的的异步请求

}

组合模式会调用新异步请求的complete方法。链式模式会调用传入已有异步请求的complete方法。

组合模式和链式模式的异步请求调用流程区别

组合模式:在当前异步请求完成时,调用当前异步请求的监听器回调,在监听器回调中将新异步对象传给适配器的回调方法。在适配器的回调方法中会调用新异步请求的complete方法,完成新异步请求。即组合模式返回的异步请求(新的)

链式模式:在当前异步请求完成时,调用当前异步请求的监听器回调,在监听器回调中调用传入的异步请求的complete方法,完成传入的异步请求。没有返回值。

链式模式运用

消费者加入消费组就是运用了监听器 + 组合模式 + 链式模式,保证业务逻辑的准确和异步请求的调用顺序。

消费者加入消费组:加入消费组必须完成同步组,加入消费组请求必须比同步组请求先发送,同步组的异步请求完成后加入组的异步请求才能完成。

消费者加入消费组步骤:

1,客户端发送加入组请求JoinGroup,采用组合模式返回加入组的异步请求。

2,在加入组的适配器处理中,发送同步组请求,采用组合模式返回同步组的异步请求。

3,将同步组的异步请求使用链式模式链接上加入组的异步请求,为同步组的异步请求添加一个监听器。

4,当同步组请求收到客户端响应结果,完成同步组的异步请求。

5,调用同步组异步请求的监听器回调方法,完成加入组的异步请求。

6,加入组请求完成,获取加入组异步请求的结果。

伪代码:

第一段:

RequestFuture joinFuture = sendJoinGroupRequest();

client.poll(joinFuture);

ByteBuffer byteBuffer = future.value(); // 第六步:获取JoinGroup异步请求的结果

第二段:

// 采用组合模式发送加入组请求

private RequestFuture sendJoinGroupRequest() {

return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); // 第一步:发送JoinGroup请求

}

第三段:

private class JoinGroupResponseHandler extends RequestFutureAdapter {

public void onSuccess(ClientResponse clientResponse, RequestFuture future) { // future:JoinGroup的异步请求

SyncGroupRequest syncGroupRequest = new SyncGroupRequest();

RequestFuture syncFuture = sendSyncGroupRequest(syncGroupRequest); // 发送同步组请求

syncFuture.chain(joinFuture); // 第三步:将JoinGroup异步请求链接到SyncGroup异步请求

}

}

第四段:

// 采用组合模式发送同步组请求

private RequestFuture sendSyncGroupRequest(SyncGroupRequest request) {

return client.send(coordinator, ApiKeys.SYNC_GROUP, request) .compose(new SyncGroupResponseHandler()); // 第二步:发送SyncGroup请求

}

第五段:

private class SyncGroupResponseHandler extends RequestFutureAdapter {

public void onSuccess(ClientResponse clientResponse, RequestFuture future) { // future:SyncGroup的异步请求

future.complete(future) // 第四步:完成SyncGroup的异步请求

}

}

第六段:

public void chain(final RequestFuture future) {

addListener(new RequestFutureListener() {

public void onSuccess(T value) {

future.complete(value); // 第五步:完成JoinGroup异步请求

}

以上重点分析了异步请求的相关调用流程,后面将重点分析消费者网络客户端的轮询。

参考资料:

Kafka技术内幕:图文详解Kafka源码设计与实现

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容

  • 此篇开始进入kafka的另外一侧:消费者。kafka中的消费者比生产者要复杂的多,里面涉及到的消费组,偏移量等概念...
    绍圣阅读 1,902评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,340评论 8 265
  • 1、通过CocoaPods安装项目名称项目信息 AFNetworking网络请求组件 FMDB本地数据库组件 SD...
    阳明先生_X自主阅读 15,969评论 3 119
  • 我们项目组计划每周都开一次讨论会。定一个议题,大家各抒己见。从进公司到今天,组织了两场这样的会议。会议都进行了两个...
    xxwade阅读 132评论 0 1