Future系列(CompletableFuture与retrofit)使用和解析

一、在Android中的使用

1.gradle依赖
    implementation "io.reactivex.rxjava2:rxjava:2.0.8"
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementation 'com.squareup.retrofit2:retrofit:2.9.0'
    implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'

注意,retrofit 2.9.0 已经内置了 java8 的 adapter,所以不需要 adapter-java8 的依赖了
另外,别忘了网络权限

2.定义接口
public interface GitHubService {
    @GET("users/{user}/repos")
    Observable<List<Repo>> listRepos(@Path("user") String user);

    @GET("users/{user}/repos")
    CompletableFuture<List<Repo>> listRepos2(@Path("user") String user);
}

上面的是 rxjava 的用法,下面是 CompletableFuture 的用法。我写在一起是为了比对

3.定义retrofit
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://api.github.com/")
                .addConverterFactory(GsonConverterFactory.create())
                .build();

        GitHubService service = retrofit.create(GitHubService.class);

注意我们这里不需要添加 addCallAdapterFactory 了。因为 2.9.0 的源码里面内置了 CompletableFutureCallAdapterFactory。并且已经默认添加进去了

    List<? extends CallAdapter.Factory> defaultCallAdapterFactories(
            @Nullable Executor callbackExecutor) {
        DefaultCallAdapterFactory executorFactory = new DefaultCallAdapterFactory(callbackExecutor);
        return hasJava8Types
                ? asList(CompletableFutureCallAdapterFactory.INSTANCE, executorFactory)
                : singletonList(executorFactory);
    }

具体代码在 Retrofit.Builder build 的时候 defaultCallAdapterFactories 里面有

4.网络请求
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        CompletableFuture<List<Repo>> future = service.listRepos2("octocat").handle(new BiFunction<List<Repo>, Throwable, List<Repo>>() {
            @Override
            public List<Repo> apply(List<Repo> repos, Throwable throwable) {
                Log.e("handleAsync", Thread.currentThread().getName());//***************(1)
                Log.e("handleAsync", repos.toString());
                return repos;
            }
        });
        try {
            future.get(5, TimeUnit.SECONDS);//这里必须加 try catch***************(2)
            future.thenApply(new Function<List<Repo>, String>() {
                @Override
                public String apply(List<Repo> repos) {
                    Log.e("thenApply2", Thread.currentThread().getName());//前面必须要有get,不然不会输出main
                    return null;
                }
            }).thenApply(new Function<String, String>() {
                @Override
                public String apply(String repos) {
                    Log.e("thenApply2", Thread.currentThread().getName());//**********************(3)
                    return null;
                }
            });
        } catch (Exception e) {
            Log.e("handleAsync", e.toString());
        }

接下来要解释几点
(1)1号注释那里输出啥?
listRepos2 返回 BodyCallAdapter。里面就是熟悉的 call.enqueue。如果没有特别指明线程池,那么肯定是 okhttp 里面自带的线程
所以就算在 handleAsync 里面指明线程池也没用,它只是改变了 apply 函数里面的线程。要想用自己设计的线程池必须要 okhttp 里面自己设定
PS:就算自己设定了线程池,输出的线程名字依然没有变是因为 NamedRunnable 改了名字
(2)为啥一定要get
主线程没有特定的线程池,所以只能用get让后面的方法在main上运行
(3)3号注释输出啥
和上一个操作符指定的线程一致。这一点和rxjava很像

二、原理解析

如果看过上一篇的话就会发现一个很奇怪的现象。在Java中 thenApply 会自动切换到主线程,而Android中和上一个操作符指明的线程一致。接下来就来分析一下为什么
(其实java中如果在 supplyAsync 里面添加了 Thread.sleep 也可以得到和 android 一样的结果,原因不明)
其实有了上面的使用基本也就知道了,CompletableFuture 是在主线程通过阻塞方法 get 来获取到子线程中的值的,根本不存在什么 Handler。严格意义上来讲里面没有“切换”的操作。更何况主线程也不可能通过 Executor 来包装。
那么接下来就大致讲解一下整个流程是如何串起来的。先来段示例代码

        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                }
                System.out.println("supplyAsync===" + Thread.currentThread().getName());
                return "supplyAsync";
            }
        }).thenApply(new Function<String, String>() {
            @Override
            public String apply(String t) {
                System.out.println("apply===" + Thread.currentThread().getName());
                return "apply";
            }
        });

在讲解之前先介绍一下数据结构吧

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    volatile Completion stack;

    abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
        volatile Completion next;      // Treiber stack link
    }
}

每个 CompletableFuture 内部有一个 stack(只是名字是 stack,其实还是个链表)。stack 内部有一个链表

supplyAsync
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));

AsyncSupply 是个 Runnable,所以看 run 方法

        public void run() {
            CompletableFuture<T> d; Supplier<? extends T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {//没有结果就等待结果
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }

很简单,先计算结果,然后postComplete

    final void postComplete() {
        CompletableFuture<?> f = this; Completion h;
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            if (f.casStack(h, t = h.next)) {// 从头遍历stack,并更新头元素
                if (t != null) {
                    if (f != this) {
                        pushStack(h);// 如果f不是当前CompletableFuture,则将它的头结点压入到当前CompletableFuture的stack中,使树形结构变成链表结构,避免递归层次过深
                        continue;
                    }
                    h.next = null;// 如果是当前CompletableFuture, 解除头节点与栈的联系
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;//类似于钩子,执行对应 thenApply或者whenComplete的回调
            }
        }
    }

supplyAsync 主要功能就是生成一个新的 CompletableFuture。等待 get 完成并且做点扫尾工作,然后 h.tryFire 下一个 CompletableFuture

thenApply
        CompletableFuture<V> d = newIncompleteFuture();
        if (e != null || !d.uniApply(this, f, null)) {
            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
            push(c);//把当前的UniApply 放到next中
            c.tryFire(SYNC);//tryFire下一个
        }

看到了吧,也是新生成 CompletableFuture。和 rxjava 类似。

    static final class UniApply<T,V> extends UniCompletion<T,V> {
        Function<? super T,? extends V> fn;
        UniApply(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src,
                 Function<? super T,? extends V> fn) {
            super(executor, dep, src); this.fn = fn;
        }
        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
            if ((d = dep) == null ||
                !d.uniApply(a = src, fn, mode > 0 ? null : this))//这里面计算thenApply的function
                return null;
            dep = null; src = null; fn = null;
            return d.postFire(a, mode);//最后绕来绕去还是会回到 postComplete
        }
    }

很明显,每一步都生成一个新的 CompletableFuture,然后通过 stack 串起来

PS: supplyAsync 最后提到的 扫尾工作 之所以没有展开来讲是因为我也没看懂dep的意思。stack对应的链式调用到时看明白了。如果想了解详细内容,可以参考如下两篇文章
CompletableFuture原理解析
CompletableFuture 原理浅析

2021.04.09
终于在okhttp里面找到了包装handler的方法了,现在就贴出来吧

public class MainThreadExecutor implements Executor {

    private final Handler handler = new Handler(Looper.getMainLooper());

    @Override
    public void execute(@NonNull Runnable command) {
        handler.post(command);
    }
}

如果使用这个的话其实就和rxjava在代码结构上没啥区别,就是原理不一样。CompletableFuture 用到是链表,rxjava是用包一层新的obsevable

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容