消费者轮询通过拉取器(Fetcher)发送拉取请求,拉取器会调用消费者网络客户端的发送方法(send)和网络轮询方法(poll)。在拉取器的层面拉取请求是没有真正发送到服务端的。发送方法只是把请求存在到变量中,真正发送到服务端是调用了消费者网络客户端对象的网络轮询方法(poll)。
消费者网络客户端:ConsumerNetworkClient,对NetworkClient的一层封装。
消费者网络客户端发送方法
// 发送请求,只是把请求暂时存放在unsent变量中
private RequestFuture<ClientResponse> send(Node node, ApiKeys api, short version, AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api, version);
RequestSend send = new RequestSend(node.idString(),
header, request.toStruct());
put(node, new ClientRequest(now, true, send, completionHandler)); // 没有真正发送 client.wakeup();
return completionHandler.future;
}
private void put(Node node, ClientRequest request) {
synchronized (this) {
List<ClientRequest> nodeUnsent = unsent.get(node);
if (nodeUnsent == null) {
nodeUnsent = new ArrayList<>();
unsent.put(node, nodeUnsent);
}
nodeUnsent.add(request);
}
}
保存到变量unsent(Map<Node,List<ClientRequest>>)中的对象是ClientRequest。
ClientRequest相关类图:
消费者网络客户端异步发送请求涉及的相关类说明:
1,异步请求完成处理器(RequestFutureCompletionHandler)
2,异步请求(RequestFuture):客户端调用发送请求的返回值。当异步请求完成时,可以获取异步请求的结果。
3,异步请求监听器(RequestFutureListener):异步请求添加监听器,当异步请求有结果时,调用监听器的onSuccess方法。
以拉取器发送拉取请求为例,看看监听器的使用方式:
1,消费者客户端调用ConsumerNetworkClient发送拉取请求返回异步请求对象(RequestFuture)。同时也创建了异步请求完成处理器(RequestFutureCompletionHandler),RequestFutureCompletionHandler实例持有RequestFuture实例,返回其实也是返回的RequestFutureCompletionHandler持有的RequestFuture实例。
2,在返回的异步请求对象上添加异步请求监听器,监听器会处理拉取的到结果。
3,客户端轮询,在收到拉取请求结果后,调用回调处理器的onComplete方法(RequestFutureCompletionHandler.onComplete(ClientResponse))。
4,触发监听器onSuccess方法(RequestFutureListener.onSuccess(ClientResponse))
调用发送方法(send)返回的是一个异步请求对象:RequestFuture。RequestFuture封装了请求相关的信息,表示异步请求的结果。发送方法中还会创建一个异步请求的完成处理器(RequestFutureCompletionHandler:当请求被服务器处理完成,并返回响应结果给客户端,客户端会根据响应结果执行具体逻辑)
网络客户端轮询到拉取请求结果后的处理流程图:
网络客户端轮询到拉取请求结果后的处理时序图:
组合加适配器模式(compose+adapter)
异步请求(RequestFuture)为客户端提供了调用自定义业务处理逻辑的的入口,除了添加监听器的方式(类似拉取请求),还可以使用组合加适配器模式:监听器+适配器(对客户端响应结果做进行解析成客户想要的数据格式)。所以一句话概括组合加适配器:监听器+转换响应结果成客户想要的数据格式。组合表示组装一个监听器,适配器表示对客户端响应结果进行适配。
普通模式
伪代码(发送拉取请求):为异步请求添加监听器,当请求完成时,会调用监听器的回调方法会对响应进行处理。
client.send(fetchTarget, ApiKeys.FETCH, request)
.addListener(new RequestFutureListener() { // 监听器
public void onSuccess(ClientResponse resp) {
}
}
组合加适配器模式
伪代码(发送列举偏移量请求给分区的主节点):适配器中对响应结果进行适配。在组合方法(RequestFuture.compose())中,创建一个新的异步请求对象S,在旧的异步请求对象T(调用compose方法的异步请求对象)添加监听器,并传递新的异步请求对象S到适配器的onSuccess方法中。在适配器的onSuccess方法中进行消息的转换。
client.send(node, ApiKeys.LIST_OFFSETS, request)
.compose(new RequestFutureAdapter>() { // 适配器
public void onSuccess(ClientResponse response, RequestFuture> future) {
future.complete(); // 新建的异步请求的complete方法 触发调用新建的异步请求的监听器的onSuccess方法
}
});
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
// 在异步请求T里,新建了一个异步请求S
final RequestFuture<S> adapted = new RequestFuture<>();
addListener(new RequestFutureListener<T>() { // 为T旧的异步请求添加监听器
// 客户端轮询到结果时,会调用监听器的回调方法
public void onSuccess(T value) {
adapter.onSuccess(value, adapted); // 调用适配器的回调方法 进行消息转换
}
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted; // 返回新建的异步请求对象S
}
组合+适配器模式的调用流程(列举偏移量请求)
普通模式和组合加适配器模式的区别:
普通模式使用监听器,并将异步请求的结果穿给监听器的回调方法。组合模式的监听器,对异步请求的结果在适配器中做一次转换。
ConsumerNetworkClient.polll(RequestFuture)和ConsumerNetworkClient.poll(timeout)
ConsumerNetworkClient.polll(RequestFuture):必须等到异步请求完成才会结束,在轮询结束后可以获取异步请求的结果。
ConsumerNetworkClient.poll(timeout):不管异步请求有没有完成,都会在给定的超时时间内返回。这时在轮询完成后获取异步请求的结果不一定有结果。
异步请求的链式模式
将另一个异步请求链接起来。
链式调用和组合模式的区别
1,组合模式和链接模式都会为当前异步请求添加一个监听器。
2,组合模式会创建一个新的异步请求,链接模式则传入一个已有的异步请求。
3,组合模式返回新异步请求,链接模式返回当前异步请求,不是返回传入的异步请求。
// 链路模式 @param future 已有的异步请求
public void chain(final RequestFuture future) {
addListener(new RequestFutureListener() {
public void onSuccess(T value) { // value是当前异步请求的结果
// 用当前异步请求的结果作为传入的异步请求的的结果
future.complete(value); // 调用异步请求的完成方法
}
public void onFailure(RuntimeException e) { future.raise(e);
}
}); // 没有返回值,所以调用方还是使用的当前的异步请求对象,而不是传入的的异步请求
}
组合模式会调用新异步请求的complete方法。链式模式会调用传入已有异步请求的complete方法。
组合模式和链式模式的异步请求调用流程区别
组合模式:在当前异步请求完成时,调用当前异步请求的监听器回调,在监听器回调中将新异步对象传给适配器的回调方法。在适配器的回调方法中会调用新异步请求的complete方法,完成新异步请求。即组合模式返回的异步请求(新的)
链式模式:在当前异步请求完成时,调用当前异步请求的监听器回调,在监听器回调中调用传入的异步请求的complete方法,完成传入的异步请求。没有返回值。
链式模式运用
消费者加入消费组就是运用了监听器 + 组合模式 + 链式模式,保证业务逻辑的准确和异步请求的调用顺序。
消费者加入消费组:加入消费组必须完成同步组,加入消费组请求必须比同步组请求先发送,同步组的异步请求完成后加入组的异步请求才能完成。
消费者加入消费组步骤:
1,客户端发送加入组请求JoinGroup,采用组合模式返回加入组的异步请求。
2,在加入组的适配器处理中,发送同步组请求,采用组合模式返回同步组的异步请求。
3,将同步组的异步请求使用链式模式链接上加入组的异步请求,为同步组的异步请求添加一个监听器。
4,当同步组请求收到客户端响应结果,完成同步组的异步请求。
5,调用同步组异步请求的监听器回调方法,完成加入组的异步请求。
6,加入组请求完成,获取加入组异步请求的结果。
伪代码:
第一段:
RequestFuture joinFuture = sendJoinGroupRequest();
client.poll(joinFuture);
ByteBuffer byteBuffer = future.value(); // 第六步:获取JoinGroup异步请求的结果
第二段:
// 采用组合模式发送加入组请求
private RequestFuture sendJoinGroupRequest() {
return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); // 第一步:发送JoinGroup请求
}
第三段:
private class JoinGroupResponseHandler extends RequestFutureAdapter {
public void onSuccess(ClientResponse clientResponse, RequestFuture future) { // future:JoinGroup的异步请求
SyncGroupRequest syncGroupRequest = new SyncGroupRequest();
RequestFuture syncFuture = sendSyncGroupRequest(syncGroupRequest); // 发送同步组请求
syncFuture.chain(joinFuture); // 第三步:将JoinGroup异步请求链接到SyncGroup异步请求
}
}
第四段:
// 采用组合模式发送同步组请求
private RequestFuture sendSyncGroupRequest(SyncGroupRequest request) {
return client.send(coordinator, ApiKeys.SYNC_GROUP, request) .compose(new SyncGroupResponseHandler()); // 第二步:发送SyncGroup请求
}
第五段:
private class SyncGroupResponseHandler extends RequestFutureAdapter {
public void onSuccess(ClientResponse clientResponse, RequestFuture future) { // future:SyncGroup的异步请求
future.complete(future) // 第四步:完成SyncGroup的异步请求
}
}
第六段:
public void chain(final RequestFuture future) {
addListener(new RequestFutureListener() {
public void onSuccess(T value) {
future.complete(value); // 第五步:完成JoinGroup异步请求
}
以上重点分析了异步请求的相关调用流程,后面将重点分析消费者网络客户端的轮询。
参考资料:
Kafka技术内幕:图文详解Kafka源码设计与实现