消费者能发送拉取请求的前提条件是:1,消费者已经连接上了服务端协调者所在的节点;2,消费者必须获取到服务端协调者分配给此消费者的分区。
消费者协调者和服务端的协调者之间是通过心跳来维持关系的:让消费者能联系上协调者,让协调者知道消费者的存在。但是当两边某一方出现问题的时候,会发生什么?
1,消费者没有发送心跳(消费者发生故障),协调者应该知道有消费者离开了消费组,需要对消费组内所有的消费者重新分配分区。
2,服务端协调者发生故障,服务端会自己容错选出一个新的协调者节点来管理消费组。消费者必须等待一定的时间重新询问服务端是否选择出新的协调者,如果还没有选出,就再等一段时间再询问。如果已经选出新的协调者节点,消费者必须重新与其建立连接,并向协调者发送获取分配的分区信息请求。
消费者为了获取协调者分配的分区,每个消费者都要发送加入组请求给协调者。
消费者加入消费组
消费者发送加入消费组请求的方法在:AbstractCoordinator.ensureActiveGroup(),消费者每次轮询操作都会调用该方法,但是并不意味着每次轮询都会发送加入组请求。因为后续发送拉取请求必须有分区,所以加入消费组请求必须采用阻塞的轮询等待异步请求完成。异步请求完成后将分配的分区结果设置到订阅状态中(SubscriptionState)。
AbstractCoordinator.ensureActiveGroup()
public void ensureActiveGroup() {
ensureCoordinatorReady(); // 确保连接上服务端协调者
startHeartbeatThreadIfNeeded(); // 启动心跳发送线程(启动并不一定立即发送心跳,满足一定条件后才会发送心跳)
joinGroupIfNeeded(); // 发送加入组请求
}
void joinGroupIfNeeded() {
// 首先判断是否需要重新加入消费组
// 再看上一次的加入请求完成否:异步请求对象是否为空。不等于NULL表示异步请求未完成(异步请求完成后会对异步请求对象设置为空表示这次发送请求完成)
// while循环是为了确保一定要消费者加入消费组中:发送加入组请求是阻塞的,拿到异步请求的结果,如果不成功,就会进行循环,加入组成功后设置needRejoin()为false
while (needRejoin() || rejoinIncomplete()) {
ensureCoordinatorReady();
// 初始为true,执行一次后更新为false,完成后又设置为true
if (needsJoinPrepare) {
// 如果是定义自动提交偏移量,那么在发送加入组请求之前必须提交本地保存的最新的偏移量 onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
// 初始化JoinGroup请求,并发送该请求,future此异步请求是加入消费组在组合模式中新创建的异步请求
RequestFuture<ByteBuffer> future = initiateJoinGroup();
// 阻塞式的客户端轮询确保异步请求完成后才会返回
// 完成的概念是等待异步请求结果回来并调用callback中的方法,才算是完成 client.poll(future);
// 重置“重新加入消费组是否完成”对象为空
resetJoinGroupFuture();
if (future.succeeded()) { // 加入组请求完成,这一步时,实际上同步组也已经成功了 needsJoinPrepare = true;
// 完成加入
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
} else {
// 异步请求完成,但是有异常,重新发送加入组请求
// 有异常时:while循环中needRejoin()返回true,rejoinIncomplete()返回false,继续执行。
// 在while循环中initiateJoinGroup()在rejoinIncomplete()返回false的情况下,会重新发送加入组请求
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue;
else if (!future.isRetriable())
throw exception; time.sleep(retryBackoffMs);
}
}
}
注意:initiateJoinGroup()返回的加入组请求的异步请求(组合模式中新创建的异步请求对象)。
疑问:什么情况下会出现needRejoin()为false,rejoinIncomplete()为true的情况?
回答:在我看的kafka-0.10.1.0版本中,从代码逻辑来看是不会出现以上情况的,因为client.poll(future)会一直阻塞直到异步请求对象完成。所以都会执行resetJoinGroupFuture()重置逻辑。
加入组请求对象:JoinGroupRequest
public class JoinGroupRequest {
private final String groupId; // 消费组名称
private final String memberId; // 消费者成员编号;消费者初次加入消费组时;此值为:UNKNON_MEMBER,协调者在处理每个消费者的加入组请求的时候,会为每个消费者指定唯一的消费者成员编号,在加入组响应中返回给消费者;后面消费者再次发送加入组请求的时候,memberId就是前面协调者分配的编号
private final String protocolType; // 协议类型 (消费者:consumer,连接器:connect)
private final List<ProtocolMetadata> groupProtocols;
/** * 协议元数据 * */
public static class ProtocolMetadata {
private final String name; // 分区分配器的类名(PartitionAssignor两种分区方式:RoundRobinAssignor循环,RangeAssignor范围)
private final ByteBuffer metadata; // 元数据内容。协议类型为消费者:那么元数据内容是订阅状态对象其中包含:消费订阅的topic
}
}
分区分配算法有两种:循环(RoundRobinAssignor)和范围(RangeAssignor),在调用assign()方法执行分配算法时,必须要两个参数:partitionsPerTopic:Map<String, Integer>:有哪些topic这些topic有多少分区(<topic,3>),subscriptions:Map<String, List<String>>:消费者成员编号,订阅的主题信息。
PartitionAssignor
public interface PartitionAssignor {
Subscription subscription(Set<String> topics); // 每个消费者订阅的主题列表
Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); // 只有主消费者会调用assign(),其中subscriptions是所有消费者的订阅信息
void onAssignment(Assignment assignment); // 分配到结果后的回调处理
String name();
/** * 消费者的订阅信息,即订阅了哪些主题 */
class Subscription { private final List<String> topics; private final ByteBuffer userData;}
/** * 消费者的分配结果,即分配了哪些分区 */
class Assignment { private final List<TopicPartition> partitions; private final ByteBuffer userData;}
}
分配方法返回值包含每个消费者的分配结果,分配结果是一个主题分区集合,表示分配给消费者的所有主题分区。
主消费者
在协调者收集完所有的消费者及其订阅信息后,协调者并不执行分区分配算法,而是交给其中一个消费者来执行分区分配,选出的这个消费者叫主消费者(通常协调者会把第一个发送加入组请求的消费者选为主消费者,当主消费者挂掉后,协调者再选择下一个消费者作为主消费者)
使用主消费者来执行分区分配算法而不是协调者本身来执行,这样可以减少协调者的负担,但是也会增加消费者和协调者之间的通信次数:主消费者完成分配后需要把结果同步回协调者,然后协调者再把分配的结果发送给消费者包括主消费者。这样会出现以下问题:
1,协调者如何选择主消费者?
2,主消费者失败,协调者怎么处理?
3,协调者将所有的消费者以及订阅信息,通过加入组请求的响应结果给主消费者,那么其他的发送加入组请求的消费者也应该得到响应结果,此时推送给其他消费者的响应结果是什么喃?
4,在收到响应结果中其实并不包含分区分配的结果(因为这是主消费者还没有计算完成或者说还没有把结果发送给协调者),这是消费者怎么来获取分区分配结果喃?
应对:
1,协调者会选择第一个发送加入组请求的消费者作为主消费者。
2,主消费者实质就是一个普通消费者,所以主消费者和协调者之间还是以心跳的方式来监听对方是否宕机。
3,协调者在收集完成所有的消费者以及订阅信息后,主消费者收到的加入组响应结果中会包含所有的消费者和订阅信息来执行分区分配。而非主消费者里面不包含这些。
4,每个消费者收到加入组响应后,都会发送同步组请求给协调者来获取分区。主消费者在执行完分区分配任务后才会发送同步组请求,非主消费者会立即发送同步组请求。但是这时主消费者还没有将分配的结果发送给协调者,这时非主消费者的同步组请求在服务费会被延迟处理。协调者收到主消费者的同步请求后会将分配结果放在同步组请求响应中,返回给所有的消费者。
加入组请求和同步组请求
由于消费者接收到的加入组请求响应中没有分区信息,所以不能直接完成加入组异步请求,这时要求客户端要发送同步组请求,但是如果加入组请求有异常,就不需要继续发送同步组请求了。消费者这时需要重新发送加入组请求。非主消费者在收到加入组请求的响应后会立即发送同步组请求给协调者,主消费者会执行完分区分配后再发送同步组请求。
注意:收到加入组请求的响应,调用加入组响应处理器的回调方法只是表示收到结果,并不代表加入组的异步请求完成(如果有异常的话,就完成加入组的异步请对象)。收到同步组请求响应结果,调用同步组响应处理器回调方法,这时同步组请求响应结果包含了分配给消费者的分区信息,这时就可以完成同步组请求的异步请求,并一起完成加入组异步请求。这时消费者就可以从加入组的异步请求的结果中获取分区分配结果(代码中组合使用了组合模式和链式模式)。
加入组请求 同步组请求分解流程
疑问:
1,为什么说加入组异步请求成功完成时,同步组异步请求也成功完成了?
2,加入组异步请求是在什么时候完成的?
通过以上的流程的分解可以知道,通过加入组请求异步请求的链接模式,将同步组异步请求的结果设置为加入组异步请求的结果,从而完成加入组的异步请求。即消费者收到同步组响应后会完成同步组的异步请求,再完成加入组的异步请求。
总结一下以上流程:
1,每个消费者都向协调者发送加入组请求,申请加入消费组。
2,协调者接收每个消费者的加入组请求,收集消费组的消费者成员。
3,协调者选择一个消费者作为主消费者(一般是第一个发送加入组请求的消费者)。
4,协调者向发送加入组请求的每个消费者返回响应结果,包含消费者成员信息和订阅信息。
5,步骤三选出的主消费者做分区分配算法,并在计算完成后发送同步组请求给协调者。
6,非主消费者收到包含消费者成员信息和订阅信息的响应结果后不做计算,立即发送同步组请求给协调者。
7,协调者收到主消费者发送的同步组请求后(其中包含分区分配结果),向每一个发送同步组请求的消费者组发生同步组响应(包含每个消费者的分区结果)。
加入组前的准备工作
消费者在加入组过程中会调用onJoinPrepare(),表示在加入组之前要预处理一些事情:
1,执行一次同步提交偏移量:把本地消费的最新偏移量提交到服务端,这样再平衡完成后,消费者从协调者得到的分区偏移量就是最新(该偏移量以前的消息都是已经被消费过的)。
2,触发用户自定义再平衡监听器:客户端可以在再平衡发生时做一些额外的操作,比如把偏移量保存到数据库中等操作。
参考资料:
Kafka技术内幕:图文详解Kafka源码设计与实现