当前位置: 移动技术网 > IT编程>开发语言>Java > 0318 guava并发工具

0318 guava并发工具

2020年03月19日  | 移动技术网IT编程  | 我要评论

image.png


并发是一个难题,但是可以通过使用强力简单的抽象来显著的简化,为了简化问题,guava扩展了future接口,即 listenablefuture (可以监听的future)。
我强烈建议你在你的所有代码里使用listenablefuture去替代future,原因如下:

  • 很多的futures 类的方法需要它。(futures工具类使用)
  • 它比后来改造为listenablefutrue更简单。(早点使用比重构更简单)
  • 工具方法的提供者不需要提供future和listenablefuture方法的变体。(不需要兼容两套)


接口

一个传统的futrue代表一个异步计算的结果:一个可能完成也可能没有完成输出结果的计算。
一个future可以用在进度计算,或者说是 一个提供给我们结果的服务的承诺。


一个listenablefuture允许注册当你在计算完成的时候的回调,或者计算已经完成了。
这个简单的增强让高效支持多种操作成为可能。而future接口并不能支持。


listenblefuture中添加的基本操作是
addlistener(runnable , executor ),
它指出了当未来计算完成时,指定的runnable会在指定的executor中运行。


增加回调

很多用户喜欢使用 futures.addcallback(listenablefuture,futurecallback,executor)方法。
futurecallback实现了下面两个方法:


  • onsuccess(v) 当未来成功执行的动作,基于计算结果
  • onfailure(throwable) 当未来失败执行的动作,基于失败

创建

相较于jdk提供的 executorservice.submit(callable)方法来初始化一个异步计算。它返回一个常规的future,
guava提供了listeningexecutorservice接口,它返回listenablefuture。
把executorservice转换为listenableexecutorservice
使用:moreexecutors.listeningdecorator(executorservice)

基础用法如下:

/**
 * 说明:使用例子代码
 * @author carter
 * 创建时间: 2020年03月19日 9:54 上午
 **/

@slf4j
public class listenablefutureutils {

    public static void main(string[] args) {

listeningexecutorservice service = moreexecutors.listeningdecorator(
    executors.newfixedthreadpool(10));


        final listenablefuture<aresult> listenablefuture = service.submit(() -> {
            try {
                timeunit.seconds.sleep(5);
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
            return new aresult(30, "male", 1);

        });


        futures.addcallback(listenablefuture,
                new futurecallback<aresult>() {
                    @override
                    public void onsuccess(aresult aresult) {
                        log.info("计算成功,{}",aresult);
                    }

                    @override
                    public void onfailure(throwable throwable) {

                        log.error("计算错误",throwable);
                        
                    }
                },service);

    }
    
    @data
    @allargsconstructor
    public static class aresult{
        
        private integer age;
        
        private string sex;
        
        private integer id;
        
        
    }
    
}

相对的,如果你想从基于futuretask的api转换过来,
guava提供了
listenablefuturetask.create(callable)

listenablefuturetask.create(runnable)
不同于jdk,listenablefuturetask并不是直接扩展的。

如果你喜欢抽象的设置future的值,而不是实现一个方法然后计算值,可以考虑使用abstractfuture或使用settablefuture ; 

如果你必须转换future为listenablefuture,你别无选择,必须使用 jdkfutureadapters.listeninpoolthread(future)来转换future为listenablefuture
任何时候只要可能,推荐你修改源码让它返回一个 listenablefuture

应用

使用listenablfuture最重要的原因是可以使用链式异步操作。

代码如下:

package com.xxx.demo;

import com.google.common.util.concurrent.asyncfunction;
import com.google.common.util.concurrent.futures;
import com.google.common.util.concurrent.listenablefuture;
import lombok.allargsconstructor;
import lombok.data;

/**
 * 说明:异步操作链
 * @author carter
 * 创建时间: 2020年03月19日 10:11 上午
 **/

public class applicationutils {


    public static void main(string[] args) {

        query query = new query(30);

        listenablefuture<rowkey> rowkeyfuture = lookup(query);

        asyncfunction<rowkey, queryresult> queryfun = rowkey -> readdata(rowkey);

        final listenablefuture<queryresult> queryresultlistenablefuture = 
            futures.transformasync(rowkeyfuture, queryfun);

    }

    private static listenablefuture<queryresult> readdata(rowkey rowkey) {
        return null;
    }

    private static listenablefuture<rowkey> lookup(query query) {
        return null;
    }


    @data
    @allargsconstructor
    public static class rowkey {

        private string id;

    }

    @data
    @allargsconstructor
    public static class query {

        private integer age;

    }


    @data
    @allargsconstructor
    public static class queryresult {

        private string id;
        private string age;

    }


}

很多其他高效支持的操作listenablefuture提供,而future不提供。
不同的操作可以被不同的线程池执行,一个简单的listenablefuture可以有多个操作去等待。

只要一个操作开始,其他多个操作应该开始,fan-out, 千帆竞发。

listenablefuture可以实现这样的操作:它触发了所有请求的回调。

通过少量的工作,我们可以 fan-in.

触发一个listenablefuture 来获得计算结果,当其他的future结束的时候。

futures.allaslist是一个例子。

方法介绍:

方法 描述
transformasync(listenablefuture , asyncfunction , executor) 返回一个新的listenablefuture,它的结果是执行异步函数的返回,函数入参是listenablefuture的返回结果;
transform(listenablefuture , function , executor) 返回一个新的listenablefuture,它的结果是执行函数的返回,函数入参是listenablefuture的返回结果;
 allaslist(iterable) 返回一个listenablefuture,它的结果是一个list,包含每一个列表中的listenablefuture的执行结果,任何一个listenablefuture执行失败或者取消,最后的返回结果取消
successfullaslist(iterable) 返回一个listenablefuture,它的结果是一个list,包含每一个列表中的listenablefuture的执行结果,成功的是结果,失败或者取消的值使用null替代

asyncfunction<a,b> 提供了一个方法 , listenablefuture apply(a inpunt),它可以用来异步的转换值。

代码如下:

package com.xxx.demo;

import com.google.common.collect.lists;
import com.google.common.util.concurrent.futurecallback;
import com.google.common.util.concurrent.futures;
import com.google.common.util.concurrent.listenablefuture;
import lombok.allargsconstructor;
import lombok.data;
import lombok.extern.slf4j.slf4j;

import java.util.list;

/**
 * 说明:成功执行结果汇集
 * @author carter
 * 创建时间: 2020年03月19日 10:34 上午
 **/
@slf4j
public class test3 {

    public static void main(string[] args) {

        list<listenablefuture<queryresult>> querys = lists.newlinkedlist();
        final listenablefuture<list<queryresult>> successfulaslist =
            futures.successfulaslist(querys);
        
        futures.addcallback(successfulaslist, new futurecallback<list<queryresult>>() {
            @override
            public void onsuccess(list<queryresult> queryresults) {
                log.info("执行结果列表:{}",queryresults);
            }

            @override
            public void onfailure(throwable throwable) {
                log.error("执行失败",throwable);
            }
        });


    }

    @data
    @allargsconstructor
    public static class queryresult{
        
        
      private  integer age;
        
    }
    

}

嵌套的future

你的代码调用一个通用接口并返回一个future,很可能最终返回一个嵌套的future.

package com.xxx.demo;

import com.google.common.util.concurrent.listenablefuture;
import com.google.common.util.concurrent.listeningexecutorservice;
import com.google.common.util.concurrent.moreexecutors;
import lombok.allargsconstructor;
import lombok.data;

import java.util.concurrent.callable;
import java.util.concurrent.executors;

/**
 * 说明:嵌套的listenablefuture
 * @author carter
 * 创建时间: 2020年03月19日 10:43 上午
 **/

public class test4 {

    public static void main(string[] args) {


        final listeningexecutorservice executorservice = moreexecutors
            .listeningdecorator(executors.newfixedthreadpool(2));
        final listeningexecutorservice otherexecutorservice = moreexecutors
            .listeningdecorator(executors.newfixedthreadpool(2));


        callable<foo> othercallback =  ()->new foo("aaa");


        final listenablefuture<listenablefuture<foo>> submit = 
                executorservice.submit(() -> otherexecutorservice.submit(othercallback));


    }
    
    @data
    @allargsconstructor
    public static class foo{
        
        private string name;
    }
    
}

例子最后返回的是: listenablefuture<listenablefuture> ,
这个代码不对,因为当外层的future 取消的时候,无法传播到内层的future,
这也是一个 使用get()检查别的future或者listnener的常规的错误,

但是,除非特别关注 否则 othercallback抛出的异常会被压制。
为了避免这种情况,所有的guava的future处理方法(有些从jdk来),有 *async版本来安全的解开这个嵌套。

比如:transform,transformasyn, submit, submitasync方法。

深入研究

原创不易,转载请注明出处。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网