模拟OkHttp的实现原理

[TOC]

记录下异步请求实现思路,

  • [1.Call中:]
  • [2.Dispatcher中:]
  • [3.Call中内部类AsyncCall:]
  • [4.Call中getResponse():
  • [5.Interceptor和chain的实现方式]
  • [6.RetryInterceptor:实现请求重试]
  • [7.HeaderInterceptor:处理Header]
  • [8.ConnectInterceptor:实现连接复用]
    [8.1连接池的实现方式:]
  • [9.CallServiceInterceptor:最核心的拦截器,使用socket实现请求,返回Response对象]
  • [10.请求完成]

GitHub代码地址

使用

  • 创建CatHttpClient对象
  • 创建Request对象
  • 创建Call对象
  • 调用Call.enqueue(Callback)开始运行过程
                CatHttpClient catHttpClient = new CatHttpClient();
                Request request = new Request.Builder()
                        .url("http://www.baidu.com")
                        .get()
                        .build();
                Call call = catHttpClient.newCall(request);
                call.enqueue(new Callback() {
                    @Override
                    public void onFailure(Call call, Throwable throwable) {
                        L.e("MainActivity:onFailure:" + throwable.toString());
                    }

                    @Override
                    public void onResponse(Call call, Response response) {
                        L.e("MainActivity:onResponse");
                    }
                });

代码调用过程

1.Call中:

调用了Dispatcher对象的enqueue方法,传入AsyncCall对象,这是一个Runnable

public Call enqueue(Callback callback){
        synchronized (this){
            if(isExecuted){
                throw new IllegalStateException("call has already executed");
            }
            isExecuted = true;
        }
        catHttpClient.dispatcher().enqueue(new AsyncCall(callback));
        return this;
    }

2.Dispatcher中:

AsyncCall(runnable)放入线程池,调用它的run方法

    //最多同时请求
    int maxRequests;
    int maxRequestsPerHost;
    //线程池,发送异步请求
    private ExecutorService executorService;
    //等待执行队列
    private final Deque<Call.AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    //正在执行队列
    private final Deque<Call.AsyncCall> runningAsyncCalls = new ArrayDeque<>();

    public void enqueue(Call.AsyncCall call){
        if(runningAsyncCalls.size() >= maxRequests || runningCallsForHost(call) >= maxRequestsPerHost){
            L.e("Dispatcher:超出最大请求数,放入等待执行队列");
            readyAsyncCalls.add(call);
        }else{
            L.e("Dispatcher:开始执行,放入正在执行队列");
            runningAsyncCalls.add(call);
            executorService().execute(call);
        }
    }

3.Call中内部类AsyncCall:

getResponse()是执行任务,finally中通知Dispatcher任务结束

final class AsyncCall implements Runnable{
        Callback callback;
        public AsyncCall(Callback callback){
            this.callback = callback;
        }

        public String host(){
            return request.url().host;
        }

        @Override
        public void run() {
            L.e("Call.AsyncAll.run方法开始,拦截器链条开始");
            try {
                Response response = getResponse();
                if(isCanceled){
                    callback.onFailure(Call.this,new IOException("call canceled"));
                }else{
                    callback.onResponse(Call.this,response);
                }
            }catch (Exception e){
                callback.onFailure(Call.this,e);
            }finally {
                catHttpClient.dispatcher().finished(this);
            }
        }
    }

4.Call中getResponse():

OkHttp的重点在于拦截器实现的责任链模式,将拦截器放入一个list中,然后通过责任链的方式调用,从CallServiceInterceptor获取Response,再层层返回,每层拦截器,处理对应的业务

private Response getResponse() throws IOException{
        ArrayList<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(catHttpClient.interceptors());
        interceptors.add(new RetryInterceptor());
        interceptors.add(new HeaderInterceptor());
        interceptors.add(new ConnectInterceptor());
        interceptors.add(new CallServiceInterceptor());

        Chain chain = new InterceptorChain(interceptors,this,null,0);
        return chain.proceed();
    }

5.Interceptor和chain的实现方式

  • Chain链条,每个链条中包含他在整个拦截器链的index,Call中任务开始的地方创建了第一个Chain并调用了他的proceed(),传入index=0,proceed()中,获取到index=0的拦截器,调用拦截器的interceptor方法,返回值response作为proceed()的返回值返回,同时创建好链条中下一个Chain,由下一个链条去处理index=1的拦截器.有几个拦截器就有几个Chain.
  • 链中Chain的返回值Response是他处理的Interceptor的返回值,Interceptor的参数是下一个index的Chain,返回值也是下一个Chain的返回值,这样一直往下取得是最后一个拦截器,即实际连网发送请求的CallServiceInterceptor的返回值.
public interface Interceptor {
    Response interceptor(Chain chain) throws IOException;
}
public class InterceptorChain implements Chain {
    ArrayList<Interceptor> interceptors;
    Call call;
    HttpConnection httpConnection;
    int index;
    public InterceptorChain(ArrayList<Interceptor> interceptors, Call call, HttpConnection httpConnection, int index) {
        this.interceptors = interceptors;
        this.call = call;
        this.httpConnection = httpConnection;
        this.index = index;
    } 
    @Override
    public Response proceed(HttpConnection httpConnection) throws IOException {
        Interceptor interceptor = interceptors.get(index);
        Chain chain = new InterceptorChain(interceptors,call,httpConnection,index + 1);
        return interceptor.interceptor(chain);
    }
}

6.RetryInterceptor:实现请求重试

因为责任链的实现方式,所以直接取下一个Chain的返回值,这包含了后面所有拦截器的处理结果,如果有异常,表示请求过程需要重试,直接循环调用下一个Chain的proceed()

public class RetryInterceptor implements Interceptor {
    @Override
    public Response interceptor(Chain chain) throws IOException {
        L.e("RetryInterceptor:interceptor()");
        InterceptorChain interceptorChain = (InterceptorChain) chain;
        Call call = interceptorChain.call;
        int retries = call.client().retries();
        IOException ioException = null;
        for (int i = 0; i < retries; i++) {
            if(call.isCanceled()){
                throw new IOException("call canceled!");
            }
            try{
                return chain.proceed();
            }catch(IOException e){
                ioException = e;
            }
        }
        throw ioException;
    }
}

7.HeaderInterceptor:处理Header

public class HeaderInterceptor implements Interceptor {
    @Override
    public Response interceptor(Chain chain) throws IOException {
        L.e("HeaderInterceptor:interceptor()");
        InterceptorChain interceptorChain = (InterceptorChain) chain;
        Request request = interceptorChain.call.request();
        Map<String, String> headers = request.headers();
        headers.put(HttpCodec.HEAD_HOST, request.url().getHost());
        headers.put(HttpCodec.HEAD_CONNECTION, HttpCodec.HEAD_VALUE_KEEP_ALIVE);
        if (null != request.body()) {
            String contentType = request.body().contentType();
            if (contentType != null) {
                headers.put(HttpCodec.HEAD_CONTENT_TYPE, contentType);
            }
            long contentLength = request.body().contentLength();
            if (contentLength != -1) {
                headers.put(HttpCodec.HEAD_CONTENT_LENGTH, Long.toString(contentLength));
            }
        }
        return chain.proceed();
    }
}

8.ConnectInterceptor:实现连接复用

将每个请求封装成一个对象,包含有url,host,port等信息,一般的做法是,每个请求重复1打开连接-2写入请求-3读取响应-4释放连接这个过程,但经常的,同一个应用中,大部分请求都是对同一个host,port地址的请求,所以可以在步骤3后不进行4,让这个连接存在一段时间,下一次请求直接进行步骤2,避免频繁打开连接。需要注意的是连接的缓存时间和连接需要是长连接Keep-Alive的

public class ConnectInterceptor implements Interceptor {
    @Override
    public Response interceptor(Chain chain) throws IOException {
        L.e("ConnectInterceptor:interceptor()");
        InterceptorChain interceptorChain = (InterceptorChain) chain;
        Request request = interceptorChain.call.request();
        CatHttpClient client = interceptorChain.call.client();
        HttpUrl url = request.url();
        String host = url.getHost();
        int port = url.getPort();

        HttpConnection httpConnection = client.connectionPool().get(host, port);
        if(null == httpConnection){
            L.e("ConnectInterceptor:连接池没有,new HttpConnection()");
            httpConnection = new HttpConnection();
        }else{
            L.e("ConnectInterceptor:从连接池得到HttpConnection");
        }
        httpConnection.setRequest(request);
        Response response = chain.proceed(httpConnection);
        if(response.isKeepAlive()){
            client.connectionPool().put(httpConnection);
        }
        return response;
    }
}

8.1连接池的实现方式:

每个连接保存一个最近使用的时间。这里创建了一个守护线程池用于回收连接,当有请求结束,连接被put到连接池中时,回收线程创建,调用clean(),clean()中判断所有连接的最短缓存时间还有多久以及回收超时的连接,wait后,会继续调用clean()。中间如果被重用,则当又put回来时更新最近使用时间,重新开始过程。clean()中如果连接池回收完了,则回收停止。put()时开始回收程序

public class HttpConnectionPool {
    //长连接最大时间
    private final long keepAliveDuration;
    //复用队列
    private final Deque<HttpConnection> httpConnections = new ArrayDeque<>();
    private boolean isCleanRunning;
    public HttpConnectionPool() {
        this(1, TimeUnit.MINUTES);
    }
    public HttpConnectionPool(long keepAliveDuration, TimeUnit timeUnit) {
        this.keepAliveDuration = timeUnit.toMillis(keepAliveDuration);
    }

    private static final Executor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "catHttp.socket回收线程");
            //守护线程,当java虚拟机中没有非守护线程在运行的时候,java虚拟机会关闭
            thread.setDaemon(true);
            return thread;
        }
    });

    //检测闲置socket并对其进行清理
    private Runnable cleanRunnable = new Runnable() {
        @Override
        public void run() {
            while (true) {
                long waitTime = clean();
                L.e("waitTime = " + waitTime);
                if (waitTime <= 0) {
                    return;
                }
                synchronized (HttpConnectionPool.this) {
                    try {
                        //调用某个对象的wait()方法能让当前线程阻塞,
                        // 并且当前线程必须拥有此对象的monitor(即锁)
                        HttpConnectionPool.this.wait(waitTime);
                    } catch (InterruptedException e) {
                        L.e(e.toString());
                    }
                }
            }
        }
    };

    public void put(HttpConnection connection) {
        L.e("HttpConnectionPool:连接池新增");
        //执行检测清理
        if (!isCleanRunning) {
            L.e("HttpConnectionPool:连接池回收程序开始");
            isCleanRunning = true;
            executor.execute(cleanRunnable);
        }
        httpConnections.add(connection);
    }

    public HttpConnection get(String host, int port) {
        Iterator<HttpConnection> iterator = httpConnections.iterator();
        while (iterator.hasNext()) {
            HttpConnection connection = iterator.next();
            //查连接是否复用( 同样的host )
            if (connection.isSameAddress(host, port)) {
                //正在使用的移出连接池
                iterator.remove();
                L.e("HttpConnectionPool:连接池获取");
                return connection;
            }
        }
        return null;
    }

    private long clean() {
        long now = System.currentTimeMillis();
        long longestIdleDuration = -1;
        synchronized (this) {
            L.e("HttpConnectionPool:连接池清理......");
            for (Iterator<HttpConnection> i = httpConnections.iterator(); i.hasNext(); ) {
                HttpConnection connection = i.next();
                //获得闲置时间 多长时间没使用这个了
                long idleDuration = now - connection.lastUseTime;
                //如果闲置时间超过允许
                if (idleDuration > keepAliveDuration) {
                    connection.closeQuietly();
                    i.remove();
                    L.e("HttpConnectionPool:移出连接池");
                    continue;
                }
                //获得最大闲置时间
                if (longestIdleDuration < idleDuration) {
                    longestIdleDuration = idleDuration;
                }
            }
            //下次检查时间
            if (longestIdleDuration >= 0) {
                return keepAliveDuration - longestIdleDuration;
            } else {
                //连接池没有连接 可以退出
                L.e("HttpConnectionPool:连接池空,连接池回收程序结束");
                isCleanRunning = false;
                return longestIdleDuration;
            }
        }
    }
}

9.CallServiceInterceptor:最核心的拦截器,使用socket实现请求,返回Response对象

public class CallServiceInterceptor implements Interceptor {
    @Override
    public Response interceptor(Chain chain) throws IOException {
        L.e("CallServiceInterceptor:interceptor()");
        InterceptorChain interceptorChain = (InterceptorChain) chain;
        final HttpCodec httpCodec = new HttpCodec();
        HttpConnection connection = interceptorChain.httpConnection;
        InputStream is = connection.call(httpCodec);
        //HTTP/1.1 200 OK 空格隔开的响应状态
        String readLine = httpCodec.readLine(is);

        Map<String, String> headerMap = httpCodec.readHeaders(is);
        //是否保持连接
        boolean isKeepAlive = false;
        if(headerMap.containsKey(HttpCodec.HEAD_CONNECTION)){
            isKeepAlive = headerMap.get(HttpCodec.HEAD_CONNECTION).equalsIgnoreCase(HttpCodec.HEAD_VALUE_KEEP_ALIVE);
        }
        int contentLength = -1;
        if (headerMap.containsKey(HttpCodec.HEAD_CONTENT_LENGTH)) {
            contentLength = Integer.valueOf(headerMap.get(HttpCodec.HEAD_CONTENT_LENGTH));
        }
        //分块编码数据
        boolean isChunked = false;
        if (headerMap.containsKey(HttpCodec.HEAD_TRANSFER_ENCODING)) {
            isChunked = headerMap.get(HttpCodec.HEAD_TRANSFER_ENCODING).equalsIgnoreCase(HttpCodec.HEAD_VALUE_CHUNKED);
        }

        String body = null;
        if(contentLength > 0){
            byte[] bytes = httpCodec.readBytes(is, contentLength);
            body = new String(bytes);
        } else if(isChunked){
            body = httpCodec.readChunked(is);
        }

        String[] split = readLine.split(" ");
        int code = Integer.valueOf(split[1]);
        connection.updateLastUseTime();

        return new Response(code,contentLength,headerMap,body,isKeepAlive);
    }
}

10.请求完成

生成Response后,经过每个拦截器处理后,最终返回到3.Call中内部类AsyncCall.run(),getResponse()返回值中,这里回收Callback的成功失败方法,通知Dispatcher请求完毕

    public void finished(Call.AsyncCall call) {
        L.e("Dispatcher:请求结束 移出正在运行队列,判断是否执行等待队列中的请求");
        synchronized (this) {
            runningAsyncCalls.remove(call);
            //判断是否执行等待队列中的请求
            promoteCalls();
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351