Kafka源码分析-Consumer(12)--源码经验总结

1.如何不用锁保证“线程封闭”,即一个线程单独占用一个对象?

KafkaConsumer<K, V>用的就是“线程封闭”,实现方法是使用原子类型。如下:

public class KafkaConsumer<K, V> implements Consumer<K, V> {

   
    private static final long NO_CURRENT_THREAD = -1L;

    // currentThread holds the threadId of the current thread accessing KafkaConsumer
    // and is used to prevent multi-threaded access
    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
    // refcount is used to allow reentrant access by the thread who has acquired currentThread
    private final AtomicInteger refcount = new AtomicInteger(0);

  
    public Set<TopicPartition> assignment() {
        acquire();
        try {
         ......
        } finally {
            release();
        }
    }

 
    public Set<String> subscription() {
        acquire();
        try {
           ......
        } finally {
            release();
        }
    }

   
    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        acquire();
        try {
              ......
        } finally {
            release();
        }
    }

    
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        acquire();
        try {
              ......
        } finally {
            release();
        }
    }
   
    public ConsumerRecords<K, V> poll(long timeout) {
        acquire();//防止多线程操作。
        try {
              ......
        } finally {
            release();
        }
    }

    /**
     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
     * supported).
     * @throws IllegalStateException if the consumer has been closed
     * @throws ConcurrentModificationException if another thread already has the lock
     */
    private void acquire() {
        ensureNotClosed();
        long threadId = Thread.currentThread().getId();
        //记录当前线程Id,通过CAS操作完成
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        //记录重入次数
        refcount.incrementAndGet();
    }

    /**
     * Release the light lock protecting the consumer from multi-threaded access.
     */
    private void release() {
        if (refcount.decrementAndGet() == 0)
            //更新线程id
            currentThread.set(NO_CURRENT_THREAD);
    }
}

acquire()

每次操作都会首先调用acquire()方法,AtomicLong类型的currentThread默认是NO_CURRENT_THREAD=-1,这样当没有线程占用对象时,!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)就会把currentThread赋值为当前线程的id。当有线程占用对象时,会抛出异常。同时,compareAndSet支持多个线程通过乐观锁抢夺资源。同时重入数加一。

release()

当执行完某个方法时,线程要释放这个KafkaConsumer对象让其他的线程去使用这个对象,就要重入数减一,并把currentThread设置为NO_CURRENT_THREAD。

2.适配器设计模式的使用:

场景:consumer客户端获取groupCoordinator的流程。
使用了RequestFutureAdapter作为配适器,把它将RequestFuture<ClientResponse>适配成RequestFuture<Void>
RequestFutureCompletionHandler是回调对象,用来对服务端返回的处理。


image.png

ConsumerNetworkClient.RequestFutureCompletionHandler.onComplete()方法的代码如下:

public void onComplete(ClientResponse response) {
            if (response.wasDisconnected()) {//因连接故障而产生的ClientResponse对象
                ClientRequest request = response.request();
                RequestSend send = request.request();
                ApiKeys api = ApiKeys.forId(send.header().apiKey());
                int correlation = send.header().correlationId();
                log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                        api, request, correlation, send.destination());
                //调用继承自父类 RequestFuture 的raise()方法
                raise(DisconnectException.INSTANCE);
            } else {
                complete(response);//调用继承自父类RequestFuture的complete()方法
            }
        }

从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,还继承了RequestFuture类。RequestFuture是一个泛型类,核心字段如下:

  • isDone:表示当前的请求是否已经完成,无论正常完成还是出现异常,这个字段都是置为true。
  • exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空表示出现异常,反之表示正常完成。
  • value:记录请求正常完成时收到的响应,与exception互斥。此字段非空表示正常完成,反之表示出现异常。
  • listeners: RequestFutureListener集合,用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常的情况。
    在RequestFuture中有两处设计模式的使用:一处是compose()方法,使用了配适器模式;另一处是chain()方法,使用了责任链的模式。compose()方法相关代码如下:
/**
     *
     * 它将RequestFuture<T>适配成RequestFuture<S>
     * Convert from a request future of one type to another type
     * @param adapter The adapter which does the conversion
     * @param <S> The type of the future adapted to
     * @return The new future
     */
    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
        final RequestFuture<S> adapted = new RequestFuture<S>();//添加配置器后返回的结果
        //在当前RequestFuture上添加监听器。
        addListener(new RequestFutureListener<T>() {
            @Override
            public void onSuccess(T value) {
                adapter.onSuccess(value, adapted);
            }

            @Override
            public void onFailure(RuntimeException e) {
                adapter.onFailure(e, adapted);
            }
        });
        return adapted;
    }

下图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播过程。当调用RequestFuture<T>对象的complete()或onFailure方法,然后调用RequestFutureAdapter<T, S>的对应方法,最终调用RequestFuture<S>对象的对应方法。


使用compose()方法适配.jpg

事件传播过程如下:

consumer客户端获取groupCoordinator流程.png

3.模板设计模式的使用:

CoordinatorResponseHandler是一个抽象类,其中有parse()和handle()两个抽象方法,parse()方法对ClientResponse进行解析,得到指定类型的响应;handle()对解析后的响应进行处理。CoordinatorResponseHandler实现了RequestFuture抽象类的onSuccess()方法和onFailure方法。

protected abstract class CoordinatorResponseHandler<R, T>
            extends RequestFutureAdapter<ClientResponse, T> {
        protected ClientResponse response;//待处理的响应
        
        public abstract R parse(ClientResponse response);

        public abstract void handle(R response, RequestFuture<T> future);

        @Override
        public void onFailure(RuntimeException e, RequestFuture<T> future) {
            // mark the coordinator as dead
            if (e instanceof DisconnectException)
                coordinatorDead();
            future.raise(e);//调用adapted对象的raise()方法。
        }

        @Override
        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
            try {
                this.response = clientResponse;
                R responseObj = parse(clientResponse);//解析clientResponse
                handle(responseObj, future);//调用handle()进行处理
            } catch (RuntimeException e) {
                if (!future.isDone())
                    future.raise(e);
            }
        }
    }

由父类CoordinatorResponseHandler的方法定义操作流程,子类根据需求个性化实现流程中的抽象方法。这种模式的好处是避免每个子类都有一份流程控制的代码。下面是CoordinatorResponseHandler的子类都实现了parse()方法和handle()方法:


CoordinatorResponseHandler.jpg

4.责任链模式的使用:

在消费者收到GroupCoordinator的JoinGroupResponse的处理类时,用了责任链模式用onJoinLeader(joinResponse).chain(future);实现了对future的监听qi

 private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {

        @Override
        public JoinGroupResponse parse(ClientResponse response) {
            return new JoinGroupResponse(response.responseBody());
        }

        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(joinResponse.errorCode());
            if (error == Errors.NONE) {
                //步骤一:解析JoinGroupResponse,更新到本地。
                log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
                AbstractCoordinator.this.memberId = joinResponse.memberId();
                AbstractCoordinator.this.generation = joinResponse.generationId();
                AbstractCoordinator.this.rejoinNeeded = false;//修改了this.rejoinNeeded = false,防止重复发送
                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                sensors.joinLatency.record(response.requestLatencyMs());
                if (joinResponse.isLeader()) {//步骤二:判断是否为leader
                    /*
                    注意这里,此future是在前面sendJoinGroupRequest()方法返回的 RequestFuture 对象。
                    在onJoinLeader()和onJoinFollower()方法中,都涉及发送 SyncGroupRequest 逻辑,
                    返回的RequestFuture 标识是SyncGroupRequest的完成情况。这里使用chain()方法,主要实现
                    的功能是:当SyncGroupResponse 处理完成后,再通知这个future对象。
                     */
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }
           ......
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355