酷酷链,jianghuyinniang,景泰县人民政府网
一、推迟执行动作
可以使用timer+map方法实现.代码如下:
observable.timer(5, timeunit.milliseconds).map(value->{ return dosomething(); }).subscribe(system.out::println); }
二、推迟发送执行的结果
这种场景要求产生数据的动作是马上执行,但是结果推迟发送.这和上面场景的是不一样的.
这种场景可以使用observable.zip
来实现.
zip操作符将多个observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的observable来决定。
对于各个observable相同位置的数据,需要相互等待,也就说,第一个observable第一个位置的数据产生后,要等待第二个observable第一个位置的数据产生,等各个observable相同位置的数据都产生后,才能按指定规则进行组合.这真是我们要利用的.
zip有很多种声明,但大致上是一样的,就是传入几个observable,然后指定一个规则,对每个observable对应位置的数据进行处理,产生一个新的数据, 下面是其中一个最简单的:
public static <t1, t2, r> observable<r> zip(observable<? extends t1> o1, observable<? extends t2> o2, final func2<? super t1, ? super t2, ? extends r> zipfunction);
用zip实现推送发送执行结果如下:
observable.zip(observable.timer(5,timeunit.milliseconds) ,observable.just(dosomething()), (x,y)->y) .subscribe(system.out::println));
三、使用defer在指定线程里执行某种动作
如下面的代码,虽然我们指定了线程的运行方式,但是dosomething()
这个函数还是在当前代码调用的线程中执行的.
observable.just(dosomething()) .subscribeon(schedulers.io()) .observeon(schedulers.computation()) .subscribe(v->utils.printlnwiththread(v.tostring()););
通常我们采用下面的方法达到目的:
observable.create(s->{s.onnext(dosomething());}) .subscribeon(schedulers.io()) .observeon(schedulers.computation()) .subscribe(v->{ utils.printlnwiththread(v.tostring()); });
但其实我们采用defer也能达到相同的目的.
关于defer
defer 操作符与create、just、from等操作符一样,是创建类操作符,不过所有与该操作符相关的数据都是在订阅是才生效的。
声明:
public static <t> observable<t> defer(func0<observable<t>> observablefactory);
defer的func0里的observable是在订阅(subscribe)的时候才创建的.
作用:
do not create the observable until an observer subscribes; create a fresh observable on each subscription.
也就说observable是在订阅的时候才创建的.
上面的问题用defer实现:
observable.defer(()->observable.just(dosomething())) .subscribeon(schedulers.io()) .observeon(schedulers.computation()) .subscribe(v->{utils.printlnwiththread(v.tostring()); });
四、使用compose不要打断链式结构
我们经常看到下面的代码:
observable.just(dosomething()) .subscribeon(schedulers.io()) .observeon(schedulers.computation()) .subscribe(v->{utils.printlnwiththread(v.tostring());
上面的代码中,subscribeon(xxx).observeon(xxx)
可能在很多地方都是一样的, 如果我们打算把它统一在某一个地方实现, 我们可以这么写:
private static <t> observable<t> applyschedulers(observable<t> observable) { return observable.subscribeon(schedulers.io()) .observeon(schedulers.computation()); }
但是这样每次我们需要调用上面的方法, 大致会像下面这样,最外面是一个函数,等于打破了链接结构:
applyschedulers(observable.from(somesource).map(new func1<data, data>() { @override public data call(data data) { return manipulate(data); } }) ).subscribe(new action1<data>() { @override public void call(data data) { dosomething(data); } });
可以使用compose操作符达到不打破链接结构的目的.
compose的申明如下:
public observable compose(transformer<? super t, ? extends r> transformer);
它的入参是一个transformer接口,输出是一个observable. 而transformer实际上就是一个func1<observable<t>
, observable<r>>
,换言之就是:可以通过它将一种类型的observable转换成另一种类型的observable.
简单的说,compose可以通过指定的转化方式(输入参数transformer),将原来的observable转化为另外一种observable.
通过compose, 采用下面方式指定线程方式:
private static <t> transformer<t, t> applyschedulers() { return new transformer<t, t>() { @override public observable<t> call(observable<t> observable) { return observable.subscribeon(schedulers.io()) .observeon(schedulers.computation()); } }; } observable.just(dosomething()).compose(applyschedulers()) .subscribe(v->{utils.printlnwiththread(v.tostring()); });
函数applyschedulers可以使用lambda表达式进一步简化为下面为:
private static <t> transformer<t, t> applyschedulers() { return observable->observable.subscribeon(schedulers.io()) .observeon(schedulers.computation()); }
五、按优先级使用不同的执行结果
上面这个标题估计没表达清楚我想表达的场景. 其实我想表达的场景类似于平常的获取网络数据场景:如果缓存有,从缓存获取,如果没有,再从网络获取.
这里要求,如果缓存有,不会做从网络获取数据的动作.
这个可以采用concat+first实现.
concat将几个observable合并成一个observable,返回最终的一个observable. 而那些数据就像从一个observable发出来一样. 参数可以是多个observable,也可以是包含observalbe的iterator.
新的observable内的数据排列按原来concat里的observable顺序排列,即新结果内的数据是按原来的顺序排序的.
下面是上述需求的实现:
observable.concat(getdatafromcache(),getdatafromnetwork()).first() .subscribe(v->system.out.println("result:"+v)); //从缓存获取数据 private static observable<string> getdatafromcache(){ return observable.create(s -> { //dosomething to get data int value = new random().nextint(); value = value%2; if (value!=0){ s.onnext("data from cache:"+value); //产生数据 } //s.onerror(new throwable("none")); s.oncompleted(); } ); } //从网络获取数据 private static observable<string> getdatafromnetwork(){ return observable.create(s -> { for (int i = 0; i < 10; i++) { utils.println("obs2 generate "+i); s.onnext("data from network:" + i); //产生数据 } s.oncompleted(); } ); }
上面的实现,如果getdatafromcache有数据, getdatafromnetwork这里的代码是不会执行的, 这正是我们想要的.
上面实现有几个需要注意:
1、有可能从两个地方都获取不到数据, 这种场景下使用first会抛出异常nosuchelementexception,如果是这样的场景,需要用firstordefault替换上面的first.
2、上面getdatafromcache()
里,如果没有数据,我们直接调用oncompleted,如果不调用oncompleted,而是调用onerror,则上述采用concat是得不到任何结果的.因为concat在收到任何一个error,合并就会停止.所以,如果要用onerror, 则需要用concatdelayerror替代concat.concatdelayerror
会先忽略error,将error推迟到最后在处理.
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流。
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
apollo与springboot集成实现动态刷新配置的教程详解
网友评论