当前位置: 移动技术网 > IT编程>移动开发>Android > Android之RxJava详解

Android之RxJava详解

2019年04月03日  | 移动技术网IT编程  | 我要评论

tfboys快乐大本营什么时候播,昌邑市人民政府,口才与演讲

文章大纲

一、什么是rxjava
二、为什么要用rxjava
三、rxjava使用详解
四、项目源码下载
五、参考文章

一、什么是rxjava

  rx(reactive extensions)是一个库,用来处理事件和异步任务,在很多语言上都有实现,rxjava是rx在java上的实现。简单来说,rxjava就是处理异步的一个库,最基本是基于观察者模式来实现的。通过obserable和observer的机制,实现所谓响应式的编程体验。

二、为什么要用rxjava

  比如说一个庞大的项目,一个事件传递的整个过程可能要经历很多方法,方法套方法,每个方法的位置七零八落,一个个方法跳进去看,跳过去跳过来很容易把脑袋弄晕,不够直观。但是rxjava可以把所有逻辑用链式加闭包的方式呈现,做了哪些操作,谁在前谁在后非常直观,逻辑清晰,维护就会非常轻松。就算不是你写的你也可以很快的了解,你可以把它看作一条河流,整个过程就是对里面的水流做进行加工。懂了这个特性我们才知道在复杂的逻辑中运用rxjava是多么的重要。
  假设有这样一个需求:界面上有一个自定义的视图 imagecollectorview ,它的作用是显示多张图片,并能使用 addimage(bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 file[] folders 中每个目录下的 png 图片都加载出来并显示在 imagecollectorview 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 ui 线程执行。常用的实现方式有多种,我这里贴出其中一种:

new thread() {
    @override
    public void run() {
        super.run();
        for (file folder : folders) {
            file[] files = folder.listfiles();
            for (file file : files) {
                if (file.getname().endswith(".png")) {
                    final bitmap bitmap = getbitmapfromfile(file);
                    getactivity().runonuithread(new runnable() {
                        @override
                        public void run() {
                            imagecollectorview.addimage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

而如果使用 rxjava ,实现方式是这样的:

observable.from(folders)
    .flatmap(new func1<file, observable<file>>() {
        @override
        public observable<file> call(file file) {
            return observable.from(file.listfiles());
        }
    })
    .filter(new func1<file, boolean>() {
        @override
        public boolean call(file file) {
            return file.getname().endswith(".png");
        }
    })
    .map(new func1<file, bitmap>() {
        @override
        public bitmap call(file file) {
            return getbitmapfromfile(file);
        }
    })
    .subscribeon(schedulers.io())
    .observeon(androidschedulers.mainthread())
    .subscribe(new action1<bitmap>() {
        @override
        public void call(bitmap bitmap) {
            imagecollectorview.addimage(bitmap);
        }
    });

三、rxjava使用详解

1. rxjava设计模式

  rxjava 的异步实现,是通过一种扩展的观察者模式来实现的。
  观察者模式面向的需求是:a 对象(观察者)对 b 对象(被观察者)的某种变化高度敏感,需要在 b 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 a 不需要每过 2ms 就检查一次 b 的状态),而是采用注册(register)或者称为订阅(subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 android 开发中一个比较典型的例子是点击监听器 onclicklistener 。对设置 onclicklistener来说, view 是被观察者, onclicklistener 是观察者,二者通过 setonclicklistener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,android framework 就会将点击事件发送给已经注册的 onclicklistener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。

onclicklistener 的模式大致如下图:

 
onclicklistener 观察者模式

rxjava 的观察者模式
  rxjava 有四个基本概念:observable (可观察者,即被观察者)、 observer (观察者)、 subscribe (订阅)、事件。observable 和 observer 通过 subscribe() 方法实现订阅关系,从而 observable 可以在需要的时候发出事件来通知 observer
  与传统观察者模式不同, rxjava 的事件回调方法除了普通事件 onnext() (相当于 onclick() / onevent())之外,还定义了两个特殊的事件:oncompleted() 和 onerror()

  • oncompleted(): 事件队列完结。rxjava 不仅把每个事件单独处理,还会把它们看做一个队列。rxjava 规定,当不会再有新的 onnext() 发出时,需要触发 oncompleted() 方法作为标志。
  • onerror(): 事件队列异常。在事件处理过程中出异常时,onerror() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, oncompleted() 和 onerror() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,oncompleted() 和 onerror() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

rxjava 的观察者模式大致如下图:

 
rxjava 的观察者模式

开始接入rxjava之间,添加依赖

dependencies {
  compile 'io.reactivex:rxandroid:1.2.1'
  compile 'io.reactivex:rxjava:1.1.6'
  }

2. 创建rxjava几种方式

方式1:简单创建rxjava

/**
     * 简单创建rxjava
     *
     * observable是被观察者,创建后传入一个onsubscribe对象,当observable(观察者)调用subscribe进行注册观察者时,onsubscribe的call方法会触发。
     observableemitter: emitter 是发射器的意思,它可以发出三种类型的事件,与之对应的。
     observer有三个回调方法:

     onnext:接受到一个事件
     oncompleted:接受完事件后调用,只会调用一次
     onerror :发生错误时调用,并停止接受事件,调用一次

     注:oncompleted和onerror不会同时调用,只会调用其中之一
     */
    public static void createone() {

        //创建被观察者
        observable observable = observable.create(new observable.onsubscribe<string>() {
            @override
            public void call(subscriber<? super string> subscriber) {
                subscriber.onnext("hello");
                subscriber.onnext("吴");
                subscriber.onnext("晓畅");
                subscriber.oncompleted();
            }
        });

        subscriber<string> subscriber = new subscriber<string>() {
            @override
            public void onnext(string s) {

                system.out.println("item: " + s);
            }

            ////事件队列完结,rxjava 规定,当不会再有新的 onnext() 发出时,需要触发 oncompleted() 方法作为标志。
            @override
            public void oncompleted() {

                system.out.println("completed!");
            }

            ////事件队列异常。在事件处理过程中出异常时,onerror() 会被触发,同时队列自动终止,不允许再有事件发出。
            @override
            public void onerror(throwable e) {
                system.out.println("error!");
            }
        };

        observable.subscribe(subscriber);

    }

运行结果如下所示:

 

方式2:just(t...): 将传入的参数依次发送出来

public static void createtwo()
    {

        //相当于
        // 将会依次调用:
        // onnext("hello");
        // onnext("hi");
        // onnext("aloha");
        // oncompleted();
        observable observable = observable.just("hello", "wu", "xiaochang");

        subscriber<string> subscriber = new subscriber<string>() {
            @override
            public void onnext(string s) {

                system.out.println("item: " + s);
            }

            ////事件队列完结,rxjava 规定,当不会再有新的 onnext() 发出时,需要触发 oncompleted() 方法作为标志。
            @override
            public void oncompleted() {

                system.out.println("completed!");
            }

            ////事件队列异常。在事件处理过程中出异常时,onerror() 会被触发,同时队列自动终止,不允许再有事件发出。
            @override
            public void onerror(throwable e) {
                system.out.println("error!");
            }
        };

        observable.subscribe(subscriber);

    }

运行结果如下所示:

 

方式3:将传入的数组或 iterable 拆分成具体对象后,依次发送出来

 public static void createthree()
    {

        string[] words = {"hello", "wu", "xiaochang"};

        //相当于
        // 将会依次调用:
        // onnext("hello");
        // onnext("hi");
        // onnext("aloha");
        // oncompleted();
        observable observable = observable.from(words);

        subscriber<string> subscriber = new subscriber<string>() {
            @override
            public void onnext(string s) {

                system.out.println("item: " + s);
            }

            ////事件队列完结,rxjava 规定,当不会再有新的 onnext() 发出时,需要触发 oncompleted() 方法作为标志。
            @override
            public void oncompleted() {

                system.out.println("completed!");
            }

            ////事件队列异常。在事件处理过程中出异常时,onerror() 会被触发,同时队列自动终止,不允许再有事件发出。
            @override
            public void onerror(throwable e) {
                system.out.println("error!");
            }
        };

        observable.subscribe(subscriber);
    }

运行结果如下图所示:

 

方式4:发送多种类型参数

/**
     *发送多种类型参数
     */
    public static void createfour()
    {
        //just类似于from,但是from会将数组或iterable的元素具取出然后逐个发射,而just只是简单的原样发射,将数组或iterable当做单个数据。
        //just接受一至九个参数,返回一个按参数列表顺序发射这些数据的observable
        observable justobservable = observable.just(1, "something", false, 3.256f, "newyork");
        justobservable.subscribe(new subscriber() {
            @override
            public void oncompleted() {
                system.out.println("oncompleted!");
            }

            @override
            public void onerror(throwable e) {
                system.out.println(e.getmessage());
            }

            @override
            public void onnext(object o) {

                system.out.println(o);
            }
        });
    }

运行结果如下所示:

 

方式5:自定义subscriber

   /**
     * 自定义subscriber
     */
    public static void createfive()
    {

        observable observable = observable.just("hello", "hi", "aloha");

        action1<string> onnextaction = new action1<string>() {
            // onnext()
            @override
            public void call(string s) {
                system.out.println(s);
            }
        };
        action1<throwable> onerroraction = new action1<throwable>() {
            // onerror()
            @override
            public void call(throwable throwable) {
                // error handling
            }
        };
        action0 oncompletedaction = new action0() {
            // oncompleted()
            @override
            public void call() {
                system.out.println("completed");
            }
        };

        // 自动创建 subscriber ,并使用 onnextaction 来定义 onnext()
                observable.subscribe(onnextaction);
        // 自动创建 subscriber ,并使用 onnextaction 和 onerroraction 来定义 onnext() 和 onerror()
                observable.subscribe(onnextaction, onerroraction);
        // 自动创建 subscriber ,并使用 onnextaction、 onerroraction 和 oncompletedaction 来定义 onnext()、 onerror() 和 oncompleted()
                observable.subscribe(onnextaction, onerroraction, oncompletedaction);
    }

运行结果如下图所示:

 

3. 创建观察者方法

创建方式如下:

        observer<string> observer = new observer<string>() {
            @override
            public void onnext(string s) {
                log.d(tag, "item: " + s);
            }

            @override
            public void oncompleted() {
                log.d(tag, "completed!");
            }

            @override
            public void onerror(throwable e) {
                log.d(tag, "error!");
            }
        };

        //创建方式2
        subscriber<string> subscriber = new subscriber<string>() {
            @override
            public void onnext(string s) {
                log.d("mainactivity", "item: " + s);
            }

            @override
            public void oncompleted() {
                log.d("mainactivity", "completed!");
            }

            @override
            public void onerror(throwable e) {
                log.d("mainactivity", "error!");
            }
        };

  实质上,在 rxjava 的 subscribe 过程中,observer 也总是会先被转换成一个 subscriber 再使用。所以如果你只想使用基本功能,选择 observer 和 subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:
  onstart(): 这是 subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onstart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doonsubscribe() 方法,具体可以在后面的文中看到。
  unsubscribe(): 这是 subscriber 所实现的另一个接口 subscription 的方法,用于取消订阅。在这个方法被调用后,subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isunsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, observable 会持有 subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onpause() onstop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

3. 线程scheduler (调度器)

  在不指定线程的情况下, rxjava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 scheduler (调度器)。
在rxjava 中,scheduler ——调度器,相当于线程控制器,rxjava 通过它来指定每一段代码应该运行在什么样的线程。rxjava 已经内置了几个 scheduler ,它们已经适合大多数的使用场景:
  schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 scheduler。
  schedulers.newthread(): 总是启用新线程,并在新线程执行操作。
  schedulers.io(): i/o 操作(读写文件、读写数据库、网络信息交互等)所使用的 scheduler。行为模式和 newthread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newthread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  schedulers.computation(): 计算所使用的 scheduler。这个计算指的是 cpu 密集型计算,即不会被 i/o 等操作限制性能的操作,例如图形的计算。这个 scheduler 使用的固定的线程池,大小为 cpu 核数。不要把 i/o 操作放在 computation() 中,否则 i/o 操作的等待时间会浪费 cpu。
另外, android 还有一个专用的 androidschedulers.mainthread(),它指定的操作将在 android 主线程运行。
  有了这几个 scheduler ,就可以使用 subscribeon() 和 observeon() 两个方法来对线程进行控制了。 * subscribeon(): 指定 subscribe() 所发生的线程,即 observable.onsubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeon(): 指定 subscriber 所运行在的线程。或者叫做事件消费的线程。

public class rxjavascheduler {

    public static void showscheduler()
    {
        observable.just(1, 2, 3, 4)
//                .subscribeon(schedulers.io()) // 指定 subscribe() 发生在 io 线程

//                .observeon(androidschedulers.mainthread()) // 指定 subscriber 的回调发生在主线程

                .subscribe(new action1<integer>() {
                    @override
                    public void call(integer number) {

                        system.out.println("number:" + number);
                    }
                });

    }


    public static void main(string[] args) {

        showscheduler();
    }
    

}

四、项目源码下载

链接:https://pan.baidu.com/s/1na7dh_n2rf-pxeadmqxguq
密码:xvr2

五、参考文章

    1. http://gank.io/post/560e15be2dca930e00da1083
    2. https://blog.csdn.net/xu_song/article/details/78686439

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网