当前位置: 移动技术网 > IT编程>开发语言>Java > 异步编程RxJava-介绍

异步编程RxJava-介绍

2020年03月20日  | 移动技术网IT编程  | 我要评论

前言
前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了future和completablefuture;之后有同学在下面留言提到了rxjava,刚好最近在看这本书,里面提到了响应式扩展(reactive extensions,rx),而rxjava是rx在jvm上的实现,所有打算对rxjava进一步了解。

rxjava简介
rxjava的官网地址:https://github.com/reactivex/rxjava
其中对rxjava进行了一句话描述:rxjava – reactive extensions for the jvm – a library for composing asynchronous and event-based programs using observable sequences for the java vm.
大意就是:一个在java vm上使用可观测的序列来组成异步的、基于事件的程序的库。
更详细的说明在netflix技术博客的一篇文章中描述了rxjava的主要特点:
1.易于并发从而更好的利用服务器的能力。
2.易于有条件的异步执行。
3.一种更好的方式来避免回调地狱。
4.一种响应式方法。

与completablefuture对比
之前提到completablefuture真正的实现了异步的编程模式,一个比较常见的使用场景:

completablefuture<integer> future = completablefuture.supplyasync(耗时函数);
future<integer> f = future.whencomplete((v, e) -> {
        system.out.println(v);
        system.out.println(e);
});
system.out.println("other...");

下面用一个简单的例子来看一下rxjava是如何实现异步的编程模式:

observable<long> observable = observable.just(1, 2)
        .subscribeon(schedulers.io()).map(new func1<integer, long>() {
            @override
            public long call(integer t) {
                try {
                    thread.sleep(1000); //耗时的操作
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
                return (long) (t * 2);
            }
        });
observable.subscribe(new subscriber<long>() {
    @override
    public void oncompleted() {
        system.out.println("oncompleted");
    }
    @override
    public void onerror(throwable e) {
        system.out.println("error" + e);
    }
    @override
    public void onnext(long result) {
        system.out.println("result = " + result);
    }
});
system.out.println("other...");

func1中以异步的方式执行了一个耗时的操作,subscriber(观察者)被订阅到observable(被观察者)中,当耗时操作执行完会回调subscriber中的onnext方法。
其中的异步方式是在subscribeon(schedulers.io())中指定的,schedulers.io()可以理解为每次执行耗时操作都启动一个新的线程。
结构上其实和completablefuture很像,都是异步的执行一个耗时的操作,然后在有结果的时候主动告诉我结果。那我们还需要rxjava干嘛,不知道你有没有注意,上面的例子中其实提供2条数据流[1,2],并且处理完任何一个都会主动告诉我,当然这只是它其中的一项功能,rxjava还有很多好用的功能,在下面的内容会进行介绍。

异步观察者模式
上面这段代码有没有发现特别像设计模式中的:观察者模式;首先提供一个被观察者observable,然后把观察者subscriber添加到了被观察者列表中;
rxjava中一共提供了四种角色:observable、observer、subscriber、subjects
observables和subjects是两个被观察者,observers和subscribers是观察者;
当然我们也可以查看一下源码,看一下jdk中的observer和rxjava的observer
jdk中的observer:

public interface observer {
    void update(observable o, object arg);
}

rxjava的observer:

public interface observer<t> {
    void oncompleted();
    void onerror(throwable e);
    void onnext(t t);
}

同时可以发现subscriber是implements observer的:

public abstract class subscriber<t> implements observer<t>, subscription

可以发现rxjava中在observer中引入了2个新的方法:oncompleted()和onerror()
oncompleted():即通知观察者observable没有更多的数据,事件队列完结
onerror():在事件处理过程中出异常时,onerror()会被触发,同时队列自动终止,不允许再有事件发出。
正是因为rxjava提供了同步和异步两种方式进行事件的处理,个人觉得异步的方式更能体现rxjava的价值,所以这里给他命名为异步观察者模式

好了,下面正式介绍rxjava的那些灵活的操作符,这里仅仅是简单的介绍和简单的实例,具体用在什么场景下,会在以后的文章中介绍

maven引入

<dependency>
    <groupid>io.reactivex</groupid>
    <artifactid>rxjava</artifactid>
    <version>1.2.4</version>
</dependency>

创建observable
1.create()创建一个observable,并为它定义事件触发规则

observable<integer> observable = observable
            .create(new observable.onsubscribe<integer>() {
                @override
                public void call(subscriber<? super integer> observer) {
                    for (int i = 0; i < 5; i++) {
                        observer.onnext(i);
                    }
                    observer.oncompleted();
                }
            });
observable.subscribe(new observer<integer>() {...});

2.from()可以从一个列表中创建一个observable,observable将发射出列表中的每一个元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items);
observable.subscribe(new observer<integer>() {...});

3.just()将传入的参数依次发送出来

observable<integer> observable = observable.just(1, 2, 3);
observable.subscribe(new observer<integer>() {...});

过滤observable
1.filter()来过滤我们观测序列中不想要的值

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).filter(
        new func1<integer, boolean>() {
            @override
            public boolean call(integer t) {
                return t == 1;
            }
        });
observable.subscribe(new observer<integer>() {...});

2.take()和tasklast()分别取前几个元素和后几个元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).take(3);
observable.subscribe(new observer<integer>() {...});
observable<integer> observable = observable.from(items).takelast(2);

3.distinct()和distinctuntilchanged()
distinct()过滤掉重复的值

list<integer> items = new arraylist<integer>();
items.add(1);
items.add(10);
items.add(10);
observable<integer> observable = observable.from(items).distinct();
observable.subscribe(new observer<integer>() {...});

distinctuntilchanged()列发射一个不同于之前的一个新值时让我们得到通知

list<integer> items = new arraylist<integer>();
items.add(1);
items.add(100);
items.add(100);
items.add(200);
observable<integer> observable = observable.from(items).distinctuntilchanged();
observable.subscribe(new observer<integer>() {...});

4.first()和last()分别取第一个元素和最后一个元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
// observable<integer> observable = observable.from(items).first();
observable<integer> observable = observable.from(items).last();
observable.subscribe(new observer<integer>() {...});

5.skip()和skiplast()分别从前或者后跳过几个元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
// observable<integer> observable = observable.from(items).skip(2);
observable<integer> observable = observable.from(items).skiplast(2);
observable.subscribe(new observer<integer>() {...});

6.elementat()取第几个元素进行发射

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).elementat(2);
observable.subscribe(new observer<integer>() {...});

7.sample()指定发射间隔进行发射

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 50000; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).sample(1,timeunit.microseconds);
observable.subscribe(new observer<integer>() {...});

8.timeout()设定的时间间隔内如果没有得到一个值则发射一个错误

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).timeout(1,timeunit.microseconds);
observable.subscribe(new observer<integer>() {...onerror()...});

9.debounce()在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).debounce(1,timeunit.microseconds);
observable.subscribe(new observer<integer>() {...});

转换observable
1.map()接收一个指定的func对象然后将它应用到每一个由observable发射的值上

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).map(
        new func1<integer, integer>() {
            @override
            public integer call(integer t) {
                return t * 2;
            }
        });
observable.subscribe(new observer<integer>() {...});

2.flatmap()函数提供一种铺平序列的方式,然后合并这些observables发射的数据

final scheduler scheduler = schedulers.from(executors.newfixedthreadpool(3));
list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).flatmap(
        new func1<integer, observable<? extends integer>>() {
            @override
            public observable<? extends integer> call(integer t) {
                list<integer> items = new arraylist<integer>();
                items.add(t);
                items.add(99999);
                return observable.from(items).subscribeon(scheduler);
            }
        });
observable.subscribe(new observer<integer>() {...});

重要的一点提示是关于合并部分:它允许交叉。这意味着flatmap()不能够保证在最终生成的observable中源observables确切的发射
顺序。

3.concatmap()函数解决了flatmap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。
示例代码同上,将flatmap替换为concatmap,输出的结果来看是有序的

4.switchmap()和flatmap()很像,除了一点:每当源observable发射一个新的数据项(observable)时,它将取消订阅并停止监视之前那个数据项产生的observable,并开始监视当前发射的这一个。
示例代码同上,将flatmap替换为switchmap,输出的结果只剩最后一个值

5.scan()是一个累积函数,对原始observable发射的每一项数据都应用一个函数,计算出函数的结果值,并将该值填充回可观测序列,等待和下一次发射的数据一起使用。

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<integer> observable = observable.from(items).scan(
        new func2<integer, integer, integer>() {
 
            @override
            public integer call(integer t1, integer t2) {
                system.out.println(t1 + "+" + t2);
                return t1 + t2;
            }
        });
observable.subscribe(new observer<integer>() {...});

6.groupby()来分组元素

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<groupedobservable<integer, integer>> observable = observable
                .from(items).groupby(new func1<integer, integer>() {
                    @override
                    public integer call(integer t) {
                        return t % 3;
                    }
                });
observable.subscribe(new observer<groupedobservable<integer, integer>>() {
        @override
        public void onnext(final groupedobservable<integer, integer> t) {
            t.subscribe(new action1<integer>() {
                @override
                public void call(integer value) {
                    system.out.println("key:" + t.getkey()+ ", value:" + value);
                }
            });
                  
});

7.buffer()函数将源observable变换一个新的observable,这个新的observable每次发射一组列表值而不是一个一个发射。

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<list<integer>> observable = observable.from(items).buffer(2);
observable.subscribe(new observer<list<integer>>() {...});

8.window()函数和 buffer()很像,但是它发射的是observable而不是列表

list<integer> items = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
observable<observable<integer>> observable = observable.from(items).window(2);
observable.subscribe(new observer<observable<integer>>() {
    @override
    public void onnext(observable<integer> t) {
        t.subscribe(new action1<integer>() {
            @override
            public void call(integer t) {
                system.out.println("this action1 = " + this+ ",result = " + t);
            }
        });
        //oncompleted和onerror
});

9.cast()它将源observable中的每一项数据都转换为新的类型,把它变成了不同的class

list<father> items = new arraylist<father>();
items.add(new son());
items.add(new son());
items.add(new father());
items.add(new father());
observable<son> observable = observable.from(items).cast(son.class);
observable.subscribe(new observer<son>() {...});
 
class father {
}
 
class son extends father {
}

组合observables
1.merge()方法将帮助你把两个甚至更多的observables合并到他们发射的数据项里

list<integer> items1 = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items1.add(i);
}
list<integer> items2 = new arraylist<integer>();
for (int i = 5; i < 10; i++) {
    items2.add(i);
}
observable<integer> observable1 = observable.from(items1);
observable<integer> observable2 = observable.from(items2);
observable<integer> observablemerge = observable.merge(observable1,observable2);
observable.subscribe(new observer<integer>() {...});

2.zip()合并两个或者多个observables发射出的数据项,根据指定的函数 func* 变换它们,并发射一个新值

list<integer> items1 = new arraylist<integer>();
for (int i = 0; i < 5; i++) {
    items1.add(i);
}
list<integer> items2 = new arraylist<integer>();
for (int i = 5; i < 10; i++) {
    items2.add(i);
}
observable<integer> observable1 = observable.from(items1);
observable<integer> observable2 = observable.from(items2);
observable<integer> observablezip = observable.zip(observable1,
        observable2, new func2<integer, integer, integer>() {
            @override
            public integer call(integer t1, integer t2) {
                return t1 * t2;
            }
        });
observable.subscribe(new observer<integer>() {...});

3.combinelatest()把两个observable产生的结果进行合并,这两个observable中任意一个observable产生的结果,都和另一个observable最后产生的结果,按照一定的规则进行合并。

observable<long> observable1 = observable.interval(1000,timeunit.milliseconds);
observable<long> observable2 = observable.interval(1000,timeunit.milliseconds);
observable.combinelatest(observable1, observable2,
        new func2<long, long, long>() {
            @override
            public long call(long t1, long t2) {
                system.out.println("t1 = " + t1 + ",t2 = " + t2);
                return t1 + t2;
            }
        }).subscribe(new observer<long>() {...});
thread.sleep(100000);

4.join()类似combinelatest(),但是join操作符可以控制每个observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个observable产生的结果按照一定的规则进行合并

observable<long> observable1 = observable.interval(1000,
                timeunit.milliseconds);
        observable<long> observable2 = observable.interval(1000,
                timeunit.milliseconds);
        observable1.join(observable2, new func1<long, observable<long>>() {
            @override
            public observable<long> call(long t) {
                system.out.println("left=" + t);
                return observable.just(t).delay(1000, timeunit.milliseconds);
            }
        }, new func1<long, observable<long>>() {
            @override
            public observable<long> call(long t) {
                system.out.println("right=" + t);
                return observable.just(t).delay(1000, timeunit.milliseconds);
            }
        }, new func2<long, long, long>() {
            @override
            public long call(long t1, long t2) {
                return t1 + t2;
            }
        }).subscribe(new observer<long>() {
            @override
            public void oncompleted() {
                system.out.println("observable  completed");
            }
 
            @override
            public void onerror(throwable e) {
                system.out.println("oh,no!  something   wrong   happened!");
            }
 
            @override
            public void onnext(long t) {
                system.out.println("[result=]" + t);
            }
        });
 
        thread.sleep(100000);

5.switchonnext()把一组observable转换成一个observable,对于这组observable中的每一个observable所产生的结果,如果在同一个时间内存在两个或多个observable提交的结果,只取最后一个observable提交的结果给订阅者

observable<observable<long>> observable = observable.interval(2, timeunit.seconds)
        .map(new func1<long, observable<long>>() {
            @override
            public observable<long> call(long along) {
                return observable.interval(1, timeunit.milliseconds).take(5);
            }
        }).take(2);
 
observable.switchonnext(observable).subscribe(new observer<long>() {...});
thread.sleep(1000000);

6.startwith()在observable开始发射他们的数据之前,startwith()通过传递一个参数来先发射一个数据序列

observable.just(1000, 2000).startwith(1, 2).subscribe(new observer<integer>() {...});

总结
本文主要对rxjava进行了简单的介绍,从异步编程这个角度对rxjava进行了分析;并且针对observable的过滤,转换,组合的api进行了简单的介绍,当然我们更关心的是rxjava有哪些应用场景。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网