当前位置: 移动技术网 > IT编程>开发语言>Java > Java 8原生API也可以开发响应式代码?

Java 8原生API也可以开发响应式代码?

2019年08月11日  | 移动技术网IT编程  | 我要评论

asphalt-automobile-automotive-1172105.jpg

前段时间工作上比较忙,这篇文章一直没来得及写,本文是阅读《java8实战》的时候,了解到java 8里已经提供了一个异步非阻塞的接口(completablefuture),可以实现简单的响应式编程的模式,因此用这篇文章做个梳理。我是带着下面这几个问题去学习completablefuture这个接口的,

  1. completablefuture是为了解决什么问题而设计的?
  2. 它的使用场景是什么?开源软件中有实战使用案例吗?
  3. completablefuture的常用api都有哪些?如何使用?
  4. completablefuture和rxjava有什么不同?

这篇文章梳理下来,基本上可以回答前面四个问题,ok,我们进入正文。

基本概念

图片摘自dubbo官方博客
rpc(远程方法调用)的四种方式有:oneway、sync、future和callback,在dubbo或bolt这类通信框架中,默认使用的是sync模式(同步+阻塞),future和callback都属于异步模式,不过future模式在get的时候会阻塞,callback模式则不需要等待结果,有结果后服务端会回调请求方。

异步调用这类模式,比较适合的场景是io密集型场景,要执行很多远程调用的任务,并且这些调用耗时可能比较久。以openwrite中的一个case为例:我发布一篇文章,需要给几个不同的写作平台创建文章,这时候我不希望这个过程是顺序的,就比较适合用异步调用模式。

future模式除了在get()调用的时候会阻塞外,还有其他的局限性,例如:没有使用java lambda表达式的优势,对一连串的异步调用可以支持,但是写出来的代码会比较复杂。

completablefuture的常用api

阅读completablefuture的api的时候,我有一个体会——completablefuture之于future,除了增加了回调这个最重要的特性,其他的特性有点像stream对于集合迭代的增强。

使用completablefuture,我们可以像stream一样使用一部调用,可以处理一些级联的异步调用(类似于stream里的flatmap)、可以过滤一些无用的异步调用(anyof、allof)。

下面这张图是我按照自己的理解,梳理除了completablefuture常见的api,阅读的时候需要注意下面几个点:

  1. 把握几个大的分类:创建completablefuture、获取completablefuture的执行结果、主动结束completablefuture、异步调用任务的组合处理;
  2. 看着方法多,但是有规律可循,例如apply字样的接口,传入的方法参数都是有返回值的;
  3. 带either字样的,都是多个异步任务有一个满足条件即可的;
  4. 带executor方法的,都表示该方法可以用自定义的线程池来优化性能。

completablefuture的api

dubbo项目中的使用案例

dubbo对于异步化的支持起始在2.6.x中就有提供,是在发布bean的时候加个属性配置——async=true,然后利用上下文将异步标识一层层传递下去。在之前的公司中有一次排查dubbo(当时我们用的是dubbox)异步调用的问题,最后查到的原因就是多个异步调用,上下文里的信息串了。

dubbo 2.7 中使用了 jdk1.8 提供的 completablefuture 原生接口对自身的异步化做了改进。completablefuture 可以支持 future 和 callback 两种调用方式。在dubbo最新的master代码中,我知道了dubbo的异步结果的定义,它的类图如下,可以看出asyncrpcresult是一个completablefuture接口的实现。

asyncrpcresult.png

实战demo

通过下面的例子,可以看出completablefuture的最大好处——callback特性。首先定义一个接口,其中包括同步接口和该接口的异步版本。

public interface asyncinterfaceexample {

    string computesomethine();

    completablefuture<string> computesomethingasync();
}

然后定义该接口的实现类,可以看出,如果要讲现有的同步接口异步化,是比较容易的;

public class asyncinterfaceexampleimpl implements asyncinterfaceexample {

    @override
    public string computesomethine() {
        try {
            thread.sleep(2000);
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        }
        return "hello, world";
    }

    @override
    public completablefuture<string> computesomethingasync() {
        return completablefuture.supplyasync(this::computesomethine);
    }
}

然后看下我们的测试case,如下:

public class asyncinterfaceexampletest {

    private static string getotherthing() {
        try {
            thread.sleep(2000);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }

        return "other";
    }

    public static void main(string[] args) {
        asyncinterfaceexample asyncinterfaceexample = new asyncinterfaceexampleimpl();

        //case1 同步调用
        long start = system.currenttimemillis();
        string something = asyncinterfaceexample.computesomethine();
        string other = getotherthing();
        system.out.println("cost:" + (system.currenttimemillis() - start) + "  result:" + something + other);

        //case2 异步调用,使用回调
        start = system.currenttimemillis();
        completablefuture<string> somethingfuture = asyncinterfaceexample.computesomethingasync();
        other = getotherthing();

        long finalstart = start;
        string finalother = other;
        somethingfuture.whencomplete((returnvalue, exception) -> {
            if (exception == null) {
                system.out.println(
                    "cost:" + (system.currenttimemillis() - finalstart) + "  result:" + returnvalue + finalother);
            } else {
                exception.printstacktrace();
            }
        });
    }
}

上面这个案例的执行结果如下图所示:
执行结果***
本号(javaadu)专注于后端技术、jvm问题排查和优化、java面试题、个人成长和自我管理等主题,为读者提供一线开发者的工作和成长经验,期待你能在这里有所收获。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网