1.如何不用锁保证“线程封闭”,即一个线程单独占用一个对象?
KafkaConsumer<K, V>用的就是“线程封闭”,实现方法是使用原子类型。如下:
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final long NO_CURRENT_THREAD = -1L;
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// refcount is used to allow reentrant access by the thread who has acquired currentThread
private final AtomicInteger refcount = new AtomicInteger(0);
public Set<TopicPartition> assignment() {
acquire();
try {
......
} finally {
release();
}
}
public Set<String> subscription() {
acquire();
try {
......
} finally {
release();
}
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquire();
try {
......
} finally {
release();
}
}
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
acquire();
try {
......
} finally {
release();
}
}
public ConsumerRecords<K, V> poll(long timeout) {
acquire();//防止多线程操作。
try {
......
} finally {
release();
}
}
/**
* Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
* when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
* supported).
* @throws IllegalStateException if the consumer has been closed
* @throws ConcurrentModificationException if another thread already has the lock
*/
private void acquire() {
ensureNotClosed();
long threadId = Thread.currentThread().getId();
//记录当前线程Id,通过CAS操作完成
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
//记录重入次数
refcount.incrementAndGet();
}
/**
* Release the light lock protecting the consumer from multi-threaded access.
*/
private void release() {
if (refcount.decrementAndGet() == 0)
//更新线程id
currentThread.set(NO_CURRENT_THREAD);
}
}
acquire()
每次操作都会首先调用acquire()方法,AtomicLong类型的currentThread默认是NO_CURRENT_THREAD=-1,这样当没有线程占用对象时,!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)就会把currentThread赋值为当前线程的id。当有线程占用对象时,会抛出异常。同时,compareAndSet支持多个线程通过乐观锁抢夺资源。同时重入数加一。
release()
当执行完某个方法时,线程要释放这个KafkaConsumer对象让其他的线程去使用这个对象,就要重入数减一,并把currentThread设置为NO_CURRENT_THREAD。
2.适配器设计模式的使用:
场景:consumer客户端获取groupCoordinator的流程。
使用了RequestFutureAdapter作为配适器,把它将RequestFuture<ClientResponse>适配成RequestFuture<Void>
RequestFutureCompletionHandler是回调对象,用来对服务端返回的处理。
ConsumerNetworkClient.RequestFutureCompletionHandler.onComplete()方法的代码如下:
public void onComplete(ClientResponse response) {
if (response.wasDisconnected()) {//因连接故障而产生的ClientResponse对象
ClientRequest request = response.request();
RequestSend send = request.request();
ApiKeys api = ApiKeys.forId(send.header().apiKey());
int correlation = send.header().correlationId();
log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
api, request, correlation, send.destination());
//调用继承自父类 RequestFuture 的raise()方法
raise(DisconnectException.INSTANCE);
} else {
complete(response);//调用继承自父类RequestFuture的complete()方法
}
}
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,还继承了RequestFuture类。RequestFuture是一个泛型类,核心字段如下:
- isDone:表示当前的请求是否已经完成,无论正常完成还是出现异常,这个字段都是置为true。
- exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空表示出现异常,反之表示正常完成。
- value:记录请求正常完成时收到的响应,与exception互斥。此字段非空表示正常完成,反之表示出现异常。
- listeners: RequestFutureListener集合,用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常的情况。
在RequestFuture中有两处设计模式的使用:一处是compose()方法,使用了配适器模式;另一处是chain()方法,使用了责任链的模式。compose()方法相关代码如下:
/**
*
* 它将RequestFuture<T>适配成RequestFuture<S>
* Convert from a request future of one type to another type
* @param adapter The adapter which does the conversion
* @param <S> The type of the future adapted to
* @return The new future
*/
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
final RequestFuture<S> adapted = new RequestFuture<S>();//添加配置器后返回的结果
//在当前RequestFuture上添加监听器。
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted;
}
下图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播过程。当调用RequestFuture<T>对象的complete()或onFailure方法,然后调用RequestFutureAdapter<T, S>的对应方法,最终调用RequestFuture<S>对象的对应方法。
事件传播过程如下:
3.模板设计模式的使用:
CoordinatorResponseHandler是一个抽象类,其中有parse()和handle()两个抽象方法,parse()方法对ClientResponse进行解析,得到指定类型的响应;handle()对解析后的响应进行处理。CoordinatorResponseHandler实现了RequestFuture抽象类的onSuccess()方法和onFailure方法。
protected abstract class CoordinatorResponseHandler<R, T>
extends RequestFutureAdapter<ClientResponse, T> {
protected ClientResponse response;//待处理的响应
public abstract R parse(ClientResponse response);
public abstract void handle(R response, RequestFuture<T> future);
@Override
public void onFailure(RuntimeException e, RequestFuture<T> future) {
// mark the coordinator as dead
if (e instanceof DisconnectException)
coordinatorDead();
future.raise(e);//调用adapted对象的raise()方法。
}
@Override
public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
try {
this.response = clientResponse;
R responseObj = parse(clientResponse);//解析clientResponse
handle(responseObj, future);//调用handle()进行处理
} catch (RuntimeException e) {
if (!future.isDone())
future.raise(e);
}
}
}
由父类CoordinatorResponseHandler的方法定义操作流程,子类根据需求个性化实现流程中的抽象方法。这种模式的好处是避免每个子类都有一份流程控制的代码。下面是CoordinatorResponseHandler的子类都实现了parse()方法和handle()方法:
4.责任链模式的使用:
在消费者收到GroupCoordinator的JoinGroupResponse的处理类时,用了责任链模式用onJoinLeader(joinResponse).chain(future);实现了对future的监听qi
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public JoinGroupResponse parse(ClientResponse response) {
return new JoinGroupResponse(response.responseBody());
}
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
//步骤一:解析JoinGroupResponse,更新到本地。
log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
AbstractCoordinator.this.memberId = joinResponse.memberId();
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;//修改了this.rejoinNeeded = false,防止重复发送
AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
sensors.joinLatency.record(response.requestLatencyMs());
if (joinResponse.isLeader()) {//步骤二:判断是否为leader
/*
注意这里,此future是在前面sendJoinGroupRequest()方法返回的 RequestFuture 对象。
在onJoinLeader()和onJoinFollower()方法中,都涉及发送 SyncGroupRequest 逻辑,
返回的RequestFuture 标识是SyncGroupRequest的完成情况。这里使用chain()方法,主要实现
的功能是:当SyncGroupResponse 处理完成后,再通知这个future对象。
*/
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
......