当前位置: 移动技术网 > IT编程>开发语言>Java > 有了 CompletableFuture,使得异步编程没有那么难了!

有了 CompletableFuture,使得异步编程没有那么难了!

2019年10月06日  | 移动技术网IT编程  | 我要评论

枪械图片大全,江南大学教务处登录,光纤跳线型号

本文导读:

  • 业务需求场景介绍
  • 技术设计方案思考
  • future 设计模式实战
  • completablefuture 模式实战
  • completablefuture 生产建议
  • completablefuture 性能测试
  • completablefuture 使用扩展

1、业务需求场景介绍


不变的东西就是一直在变化中。

想必,大家在闲暇时刻,会经常看视频,经常用的几个 app,比如优酷、爱奇艺、腾讯等。

这些视频 app 不仅仅可以在手机上播放,还能够支持在电视上播放。

在电视终端上播放的 app 是独立发布的版本,跟手机端的 app 是不一样的。

当我们看一部电影时,点击进入某一部电影,就进入到了专辑详情页页面,此时,播放器会自动播放视频。用户在手机上看到的专辑详情页,与电视上看到的专辑详情页,页面样式设计上是不同的。

我们来直观的看一下效果。

手机上的腾讯视频专辑详情页:

手机端专辑详情页

上半部分截图,下面还有为你推荐、明星演员、周边推荐、评论等功能。

相应的,在电视端的专辑详情页展示方式是不一样的。假设产品经理提出一个需求,要求对详情页做个改版。
样式要求如下图所示:

电视端专辑详情页

两个终端的样式对比,在电视端专辑详情页中,包含了很多板块,每个板块横向展示多个内容。

产品的设计上要求是,有的板块内容来源于推荐、有的板块来源于搜索、有的板块来源cms(内容管理系统)。简单理解为,每个板块内容来源不同,来源于推荐、搜索等接口的内容要求是近实时的请求。

2、技术设计方案思考


考虑到产品提的这个需求,其实实现起来并不难。

主要分为了静态数据部分和动态数据部分,对于不经常变化的数据可以通过静态接口获取,对于近乎实时的数据可以通过动态接口获取。

静态接口设计:

专辑本身的属性以及专辑下的视频数据,一般是不经常变化的。
在需求场景介绍中,我截图的是电影频道。如果是电视剧频道,会展示剧集列表(专辑下的所有视频,如第 1 集、第 2 集...),而视频的更新一般是不太频繁的,所以在专辑详情页剧集列表数据就可以从静态接口获取。

静态接口数据生成流程:

静态接口设计

另外一部分,就是需要动态接口来实现,调用第三方接口获取数据,比如推荐、搜索数据。
同时,要求板块与板块之间的内容不允许重复。

动态接口设计:

方案一:

串行调用,即按照每个板块的展示先后顺序,调用相应的第三方接口获取数据。

方案二:

并行调用,即多个板块之间可以并行调用,提高整体接口响应效率。

其实以上两个方案,各有利弊。

方案一串行调用,好处是开发模型简单,按照串行方式依次调用接口,内容数据去重,聚合所有的数据返回给客户端。

但是,接口响应时间依赖于第三方接口的响应时间,通常第三方接口总是不可靠的,可能就会拉高接口整体响应时间,进而导致占用线程时间过长,影响接口整体吞吐量。

方案二并行调用,理论上是可以提高接口的整体响应时间,假设同时调用多个第三方接口,取决于最慢的接口响应时间。

并行调用时,需要考虑到「池化技术」,即不能无限制的在 jvm 进程上创建过多的线程。同时,也要考虑到板块与板块之间的内容数据,要按照产品设计上的先后顺序做去重。

根据这个需求场景,我们选择第二种方案来实现更合适一些。

选择了方案二,我们抽象出如下图所示的简易模型:

简易模型

t1、t2、t3 表示多个板块内容线程。t1 线程先返回结果,t2 线程返回的结果不能与与 t1 线程返回的结果内容重复,t3 线程返回的结果不能与 t1、t2 两个线程返回的结果内容重复。

我们从技术实现上考量,当并行调用多个第三方接口时,需要获取接口的返回结果,首先想到的就是 future ,能够实现异步获取任务结果。

另外,jdk8 提供了 completablefuture 易于使用的获取异步结果的工具类,解决了 future 的一些使用上的痛点,以更优雅的方式实现组合式异步编程,同时也契合函数式编程。

3、future 设计模式实战


future 接口设计:

提供了获取任务结果、取消任务、判断任务状态接口。调用获取任务结果方法,在任务未完成情况下,会导致调用阻塞。

future 接口提供的方法:
```
// 获取任务结果
v get() throws interruptedexception, executionexception;

// 支持超时时间的获取任务结果
v get(long timeout, timeunit unit)
throws interruptedexception, executionexception, timeoutexception;

// 判断任务是否已完成
boolean isdone();

// 判断任务是否已取消
boolean iscancelled();

// 取消任务
boolean cancel(boolean mayinterruptifrunning);
```

通常,我们在考虑到使用 future 获取任务结果时,会使用 threadpoolexecutor 或者 futuretask 来实现功能需求。

threadpoolexecutor、futuretask 与 future 接口关系类图:

future 设计类图

theadpoolexecutor 提供三个 submit 方法:

// 1. 提交无需返回值的任务,runnable 接口 run() 方法无返回值
public future<?> submit(runnable task) {
}

// 2. 提交需要返回值的任务,callable 接口 call() 方法有返回值
public <t> future<t> submit(callable<t> task) {
}

// 3. 提交需要返回值的任务,任务结果是第二个参数 result 对象
public <t> future<t> submit(runnable task, t result) {
}

第 3 个 submit 方法使用示例如下所示:

static string x = "东升的思考";
public static void main(string[] args) throws exception {
    executorservice executor = executors.newfixedthreadpool(1);
    // 创建 result 对象 r
    result r = new result();
    r.setname(x);

    // 提交任务
    future<result> future =
                    executor.submit(new task(r), r);
    result fr = future.get();

    // 下面等式成立
    system.out.println(fr == r);
    system.out.println(fr.getname() == x);
    system.out.println(fr.getnick() == x);
}

static class result {
    private string name;
    private string nick;
    // ... ignore getter and setter 
}

static class task implements runnable {
    result r;

    // 通过构造函数传入 result
    task(result r) {
            this.r = r;
    }

    @override
    public void run() {
            // 可以操作 result
            string name = r.getname();
            r.setnick(name);
    }
}

执行结果都是true。

futuretask 设计实现:

实现了 runnable 和 future 两个接口。实现了 runnable 接口,说明可以作为任务对象,直接提交给 threadpoolexecutor 去执行。实现了 future 接口,说明能够获取执行任务的返回结果。

我们来根据产品的需求,使用 futuretask 模拟两个线程,通过示例实现下功能。
结合示例代码注释理解:

public static void main(string[] args) throws exception {
    // 创建任务 t1 的 futuretask,调用推荐接口获取数据
    futuretask<string> ft1 = new futuretask<>(new t1task());
    // 创建任务 t1 的 futuretask,调用搜索接口获取数据,依赖 t1 结果
    futuretask<string> ft2  = new futuretask<>(new t2task(ft1));
    // 线程 t1 执行任务 ft1
    thread t1 = new thread(ft1);
    t1.start();
    // 线程 t2 执行任务 ft2
    thread t2 = new thread(ft2);
    t2.start();
    // 等待线程 t2 执行结果
    system.out.println(ft2.get());
}

// t1task 调用推荐接口获取数据
static class t1task implements callable<string> {
    @override
    public string call() throws exception {
            system.out.println("t1: 调用推荐接口获取数据...");
            timeunit.seconds.sleep(1);

            system.out.println("t1: 得到推荐接口数据...");
            timeunit.seconds.sleep(10);
            return " [t1 板块数据] ";
    }
}
        
// t2task 调用搜索接口数据,同时需要推荐接口数据
static class t2task implements callable<string> {
    futuretask<string> ft1;

    // t2 任务需要 t1 任务的 futuretask 返回结果去重
    t2task(futuretask<string> ft1) {
         this.ft1 = ft1;
    }

    @override
    public string call() throws exception {
        system.out.println("t2: 调用搜索接口获取数据...");
        timeunit.seconds.sleep(1);

        system.out.println("t2: 得到搜索接口的数据...");
        timeunit.seconds.sleep(5);
        // 获取 t2 线程的数据
        system.out.println("t2: 调用 t1.get() 接口获取推荐数据");
        string tf1 = ft1.get();
        system.out.println("t2: 获取到推荐接口数据:" + tf1);

        system.out.println("t2: 将 t1 与 t2 板块数据做去重处理");
        return "[t1 和 t2 板块数据聚合结果]";
    }
}

执行结果如下:

> task :futuretasktest.main()
t1: 调用推荐接口获取数据...
t2: 调用搜索接口获取数据...
t1: 得到推荐接口数据...
t2: 得到搜索接口的数据...
t2: 调用 t1.get() 接口获取推荐数据
t2: 获取到推荐接口数据: [t1 板块数据] 
t2: 将 t1 与 t2 板块数据做去重处理
[t1 和 t2 板块数据聚合结果] 

小结:

future 表示「未来」的意思,主要是将耗时的一些操作任务,交给单独的线程去执行。从而达到异步的目的,提交任务的当前线程,在提交任务后和获取任务结果的过程中,当前线程可以继续执行其他操作,不需要在那傻等着返回执行结果。

4、completeablefuture 模式实战


对于 future 设计模式,虽然我们提交任务时,不会进入任何阻塞,但是当调用方要获得这个任务的执行结果,还是可能会阻塞直至任务执行完成。

在 jdk1.5 设计之初就一直存在这个问题,发展到 jdk1.8 引入了 completablefuture 才得到完美的增强。

在此期间,google 开源的 guava 工具包提供了 listenablefuture ,用于支持任务完成时支持回调方式,感兴趣的朋友们可以自行查阅研究。

在业务需求场景介绍中,不同板块的数据来源是不同的,并且板块与板块之间是存在数据依赖关系的。

可以理解为任务与任务之间是有时序关系的,而根据 completablefuture 提供的一些功能特性,是非常适合这种业务场景的。

completablefuture 类图:

completablefuture 类图

completablefuture 实现了 future 和 completionstage 两个接口。实现 future 接口是为了关注异步任务什么时候结束,和获取异步任务执行的结果。实现 completionstage 接口,其提供了非常丰富的功能,实现了串行关系、并行关系、汇聚关系等。

completablefuture 核心优势:

1)无需手工维护线程,给任务分配线程的工作无需开发人员关注;

2)在使用上,语义更加清晰明确;

例如:t3 = t1.thencombine(t2, () -> { // dosomething ... } 能够明确的表述任务 3 要等任务 2 和 任务 1完成后才会开始执行。

3)代码更加简练,支持链式调用,让你更专注业务逻辑。

4)方便的处理异常情况

接下来,通过 completablefuture 来模拟实现专辑下多板块数据聚合处理。

代码如下所示:

public static void main(string[] args) throws exception {
    // 暂存数据
    list<string> stashlist = lists.newarraylist();
    // 任务 1:调用推荐接口获取数据
    completablefuture<string> t1 =
                    completablefuture.supplyasync(() -> {
                            system.out.println("t1: 获取推荐接口数据...");
                            sleepseconds(5);
                            stashlist.add("[t1 板块数据]");
                            return "[t1 板块数据]";
                    });
    // 任务 2:调用搜索接口获取数据
    completablefuture<string> t2 =
                    completablefuture.supplyasync(() -> {
                            system.out.println("t2: 调用搜索接口获取数据...");
                            sleepseconds(3);
                            return " [t2 板块数据] ";
                    });
    // 任务 3:任务 1 和任务 2 完成后执行,聚合结果
    completablefuture<string> t3 =
                    t1.thencombine(t2, (t1result, t2result) -> {
                            system.out.println(t1result + " 与 " + t2result + "实现去重逻辑处理");
                            return "[t1 和 t2 板块数据聚合结果]";
                    });
    // 等待任务 3 执行结果
    system.out.println(t3.get(6, timeunit.seconds));
}

static void sleepseconds(int timeout) {
    try {
            timeunit.seconds.sleep(timeout);
    } catch (interruptedexception e) {
            e.printstacktrace();
    }
}

执行结果如下:

> task :completablefuturetest.main()
t1: 获取推荐接口数据...
t2: 调用搜索接口获取数据...
[t1 板块数据] 与  [t2 板块数据] 实现去重逻辑处理
[t1 和 t2 板块数据聚合结果]

上述的示例代码在 idea 中新建个class,直接复制进去,即可正常运行。

** 5、completablefuture 生产建议**


创建合理的线程池:

在生产环境下,不建议直接使用上述示例代码形式。因为示例代码中使用的
completablefuture.supplyasync(() -> {});
创建 completablefuture 对象的 supplyasync() 方法(这里使用的工厂方法模式),底层使用的默认线程池,不一定能满足业务需求。

结合底层源代码来看一下:

// 默认使用 forkjoinpool 线程池
private static final executor asyncpool = usecommonpool ?
       forkjoinpool.commonpool() : new threadpertaskexecutor();

public static <u> completablefuture<u> supplyasync(supplier<u> supplier) {
     return asyncsupplystage(asyncpool, supplier);
}

创建 forkjoinpool 线程池:
默认线程池大小是 runtime.getruntime().availableprocessors() - 1(cpu 核数 - 1),可以通过 jvm 参数 -djava.util.concurrent.forkjoinpool.common.parallelism 设置线程池大小。

jvm 参数上配置 -djava.util.concurrent.forkjoinpool.common.threadfactory 设置线程工厂类;配置 -djava.util.concurrent.forkjoinpool.common.exceptionhandler 设置异常处理类,这两个参数设置后,内部会通过系统类加载器加载 class。

如果所有 completablefuture 都使用默认线程池,一旦有任务执行很慢的 i/o 操作,就会导致所有线程都阻塞在 i/o 操作上,进而影响系统整体性能。

所以,建议大家在生产环境使用时,根据不同的业务类型创建不同的线程池,以避免互相影响

completablefuture 还提供了另外支持线程池的方法。

// 第二个参数支持传递 executor 自定义线程池
public static <u> completablefuture<u> supplyasync(supplier<u> supplier,
                                                       executor executor) {
        return asyncsupplystage(screenexecutor(executor), supplier);
}

自定义线程池,建议参考 「阿里巴巴 java 开发手册」,推荐使用 threadpoolexecutor 自定义线程池,使用有界队列,根据实际业务情况设置队列大小。

线程池大小的设置,在 「java 并发编程实战」一书中,brian goetz 提供了不少优化建议。如果线程池数量过多,竞争 cpu 和内存资源,导致大量时间在上下文切换上。反之,如果线程池数量过少,无法充分利用 cpu 多核优势。

线程池大小与 cpu 处理器的利用率之比可以用下面公式估算:

线程池大小计算公式

异常处理:

completablefuture 提供了非常简单的异常处理 ,如下这些方法,支持链式编程方式。

// 类似于 try{}catch{} 中的 catch{}
public completionstage<t> exceptionally
        (function<throwable, ? extends t> fn);
                
// 类似于 try{}finally{} 中的 finally{},不支持返回结果
public completionstage<t> whencomplete
        (biconsumer<? super t, ? super throwable> action);
public completionstage<t> whencompleteasync
        (biconsumer<? super t, ? super throwable> action);
                
// 类似于 try{}finally{} 中的 finally{},支持返回结果
public <u> completionstage<u> handle
        (bifunction<? super t, throwable, ? extends u> fn);
public <u> completionstage<u> handleasync
        (bifunction<? super t, throwable, ? extends u> fn);

#### 6、completablefuture 性能测试:

循环压测任务数如下所示,每次执行压测,从 1 到 jobnum 数据叠加汇聚结果,计算耗时。
统计维度:completablefuture 默认线程池 与 自定义线程池。
性能测试代码:

// 性能测试代码
arrays.aslist(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17, 30, 50, 100, 150, 200, 300).foreach(offset -> {
                    int jobnum = processors + offset;
                    system.out.println(
                                    string.format("when %s tasks => stream: %s, parallelstream: %s, future default: %s, future custom: %s",
                                                    testcompletablefuturedefaultexecutor(jobnum), testcompletablefuturecustomexecutor(jobnum)));
});

// completablefuture 使用默认 forkjoinpool 线程池
private static long testcompletablefuturedefaultexecutor(int jobcount) {
    list<completablefuture<integer>> tasks = new arraylist<>();
    intstream.rangeclosed(1, jobcount).foreach(value -> tasks.add(completablefuture.supplyasync(completeablefutureperftest::getjob)));

    long start = system.currenttimemillis();
    int sum = tasks.stream().map(completablefuture::join).maptoint(integer::intvalue).sum();
    checksum(sum, jobcount);
    return system.currenttimemillis() - start;
}

// completablefuture 使用自定义的线程池
private static long testcompletablefuturecustomexecutor(int jobcount) {
    threadpoolexecutor threadpoolexecutor = new threadpoolexecutor(200, 200, 5, timeunit.minutes, new arrayblockingqueue<>(100000), new threadfactory() {
            @override
            public thread newthread(runnable r) {
                    thread thread = new thread(r);
                    thread.setname("custom_daemon_completablefuture");
                    thread.setdaemon(true);
                    return thread;
            }
    }, new threadpoolexecutor.callerrunspolicy());

    list<completablefuture<integer>> tasks = new arraylist<>();
    intstream.rangeclosed(1, jobcount).foreach(value -> tasks.add(completablefuture.supplyasync(completeablefutureperftest::getjob, threadpoolexecutor)));

    long start = system.currenttimemillis();
    int sum = tasks.stream().map(completablefuture::join).maptoint(integer::intvalue).sum();
    checksum(sum, jobcount);
    return system.currenttimemillis() - start;
}

测试机器配置:8 核cpu,16g内存

性能测试结果:

性能测试结果

根据压测结果看到,随着压测任务数量越大,使用默认的线程池性能越差。

7、completablefuture 使用扩展:


对象创建:

除前面提到的 supplyasync 方法外,completablefuture 还提供了如下方法:

// 执行任务,completablefuture<void> 无返回值,默认线程池
public static completablefuture<void> runasync(runnable runnable) {
      return asyncrunstage(asyncpool, runnable);
}
// 执行任务,completablefuture<void> 无返回值,支持自定义线程池
public static completablefuture<void> runasync(runnable runnable,
                                                   executor executor) {
        return asyncrunstage(screenexecutor(executor), runnable);
}

我们在 completablefuture 模式实战中,提到了 completablefuture 实现了 completionstage 接口,该接口提供了非常丰富的功能。

completionstage 接口支持串行关系、汇聚 and 关系、汇聚 or 关系。
下面对这些关系的接口做个简单描述,大家在使用时可以去自行查阅 jdk api。
同时,这些关系接口中每个方法都提供了对应的 xxxasync() 方法,表示异步化执行任务。

串行关系:

completionstage 描述串行关系,主要有 thenapply、thenrun、thenaccept 和 thencompose 系列接口。

源码如下所示:

// 对应 u apply(t t) ,接收参数 t并支持返回值 u
public <u> completionstage<u> thenapply(function<? super t,? extends u> fn);
public <u> completionstage<u> thenapplyasync(function<? super t,? extends u> fn);

// 不接收参数也不支持返回值
public completionstage<void> thenrun(runnable action);
public completionstage<void> thenrunasync(runnable action);

// 接收参数但不支持返回值
public completionstage<void> thenaccept(consumer<? super t> action);
public completionstage<void> thenacceptasync(consumer<? super t> action);

// 组合两个依赖的 completablefuture 对象
public <u> completionstage<u> thencompose
        (function<? super t, ? extends completionstage<u>> fn);
public <u> completionstage<u> thencomposeasync
        (function<? super t, ? extends completionstage<u>> fn);

汇聚 and 关系:

completionstage 描述 汇聚 and 关系,主要有 thencombine、thenacceptboth 和 runafterboth 系列接口。

源码如下所示(省略了async 方法):

// 当前和另外的 completablefuture 都完成时,两个参数传递给 fn,fn 有返回值
public <u,v> completionstage<v> thencombine
        (completionstage<? extends u> other,
         bifunction<? super t,? super u,? extends v> fn);

// 当前和另外的 completablefuture 都完成时,两个参数传递给 action,action 没有返回值
public <u> completionstage<void> thenacceptboth
        (completionstage<? extends u> other,
         biconsumer<? super t, ? super u> action);

// 当前和另外的 completablefuture 都完成时,执行 action
public completionstage<void> runafterboth(completionstage<?> other,
                                              runnable action);

汇聚 or 关系:

completionstage 描述 汇聚 or 关系,主要有 applytoeither、accepteither 和 runaftereither 系列接口。

源码如下所示(省略了async 方法):

// 当前与另外的 completablefuture 任何一个执行完成,将其传递给 fn,支持返回值
public <u> completionstage<u> applytoeither
        (completionstage<? extends t> other,
         function<? super t, u> fn);

// 当前与另外的 completablefuture 任何一个执行完成,将其传递给 action,不支持返回值
public completionstage<void> accepteither
        (completionstage<? extends t> other,
         consumer<? super t> action);

// 当前与另外的 completablefuture 任何一个执行完成,直接执行 action
public completionstage<void> runaftereither(completionstage<?> other,
                                                runnable action);

到此,completablefuture 的相关特性都介绍完了。

异步编程慢慢变得越来越成熟,java 语言官网也开始支持异步编程模式,所以学好异步编程还是有必要的。

本文结合业务需求场景驱动,引出了 future 设计模式实战,然后对 jdk1.8 中的 completablefuture 是如何使用的,核心优势、性能测试对比、使用扩展方面做了进一步剖析。

希望对大家有所帮助!

欢迎关注我的公众号,扫二维码关注解锁更多精彩文章,与你一同成长~
java爱好者社区

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网