当前位置: 移动技术网 > IT编程>移动开发>Android > Android中通过RxJava进行响应式程序设计的入门指南

Android中通过RxJava进行响应式程序设计的入门指南

2019年07月24日  | 移动技术网IT编程  | 我要评论

理财通收益,高辣文h书包网,物联网龙头

错误处理

到目前为止,我们都没怎么介绍oncomplete()和onerror()函数。这两个函数用来通知订阅者,被观察的对象将停止发送数据以及为什么停止(成功的完成或者出错了)。

下面的代码展示了怎么使用这两个函数:

observable.just("hello, world!")
  .map(s -> potentialexception(s))
  .map(s -> anotherpotentialexception(s))
  .subscribe(new subscriber<string>() {
    @override
    public void onnext(string s) { system.out.println(s); }

    @override
    public void oncompleted() { system.out.println("completed!"); }

    @override
    public void onerror(throwable e) { system.out.println("ouch!"); }
  });

代码中的potentialexception() 和 anotherpotentialexception()有可能会抛出异常。每一个observerable对象在终结的时候都会调用oncompleted()或者onerror()方法,所以demo中会打印”completed!”或者”ouch!”。

这种模式有以下几个优点:

1.只要有异常发生onerror()一定会被调用

这极大的简化了错误处理。只需要在一个地方处理错误即可以。

2.操作符不需要处理异常

将异常处理交给订阅者来做,observerable的操作符调用链中一旦有一个抛出了异常,就会直接执行onerror()方法。

3.你能够知道什么时候订阅者已经接收了全部的数据。

知道什么时候任务结束能够帮助简化代码的流程。(虽然有可能observable对象永远不会结束)

我觉得这种错误处理方式比传统的错误处理更简单。传统的错误处理中,通常是在每个回调中处理错误。这不仅导致了重复的代码,并且意味着每个回调都必须知道如何处理错误,你的回调代码将和调用者紧耦合在一起。

使用rxjava,observable对象根本不需要知道如何处理错误!操作符也不需要处理错误状态-一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。

调度器

假设你编写的android app需要从网络请求数据(感觉这是必备的了,还有单机么?)。网络请求需要花费较长的时间,因此你打算在另外一个线程中加载数据。那么问题来了!

编写多线程的android应用程序是很难的,因为你必须确保代码在正确的线程中运行,否则的话可能会导致app崩溃。最常见的就是在非主线程更新ui。

使用rxjava,你可以使用subscribeon()指定观察者代码运行的线程,使用observeron()指定订阅者运行的线程:

myobservableservices.retrieveimage(url)
  .subscribeon(schedulers.io())
  .observeon(androidschedulers.mainthread())
  .subscribe(bitmap -> myimageview.setimagebitmap(bitmap));

是不是很简单?任何在我的subscriber前面执行的代码都是在i/o线程中运行。最后,操作view的代码在主线程中运行.

最棒的是我可以把subscribeon()和observeron()添加到任何observable对象上。这两个也是操作符!。我不需要关心observable对象以及它上面有哪些操作符。仅仅运用这两个操作符就可以实现在不同的线程中调度。

如果使用asynctask或者其他类似的,我将不得不仔细设计我的代码,找出需要并发执行的部分。使用rxjava,我可以保持代码不变,仅仅在需要并发的时候调用这两个操作符就可以。

订阅(subscriptions)

当调用observable.subscribe(),会返回一个subscription对象。这个对象代表了被观察者和订阅者之间的联系。

ubscription subscription = observable.just("hello, world!")
  .subscribe(s -> system.out.println(s));

你可以在后面使用这个subscription对象来操作被观察者和订阅者之间的联系.

subscription.unsubscribe();
system.out.println("unsubscribed=" + subscription.isunsubscribed());
// outputs "unsubscribed=true"

rxjava的另外一个好处就是它处理unsubscribing的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用unsubscribe将会在他当前执行的地方终止。不需要做任何额外的工作!

rxandroid

rxandroid是rxjava的一个针对android平台的扩展。它包含了一些能够简化android开发的工具。

首先,androidschedulers提供了针对android的线程系统的调度器。需要在ui线程中运行某些代码?很简单,只需要使用androidschedulers.mainthread():

retrofitservice.getimage(url)
  .subscribeon(schedulers.io())
  .observeon(androidschedulers.mainthread())
  .subscribe(bitmap -> myimageview.setimagebitmap(bitmap));

如果你已经创建了自己的handler,你可以使用handlerthreadscheduler1将一个调度器链接到你的handler上。

接着要介绍的就是androidobservable,它提供了跟多的功能来配合android的生命周期。bindactivity()和bindfragment()方法默认使用androidschedulers.mainthread()来执行观察者代码,这两个方法会在activity或者fragment结束的时候通知被观察者停止发出新的消息。

androidobservable.bindactivity(this, retrofitservice.getimage(url))
  .subscribeon(schedulers.io())
  .subscribe(bitmap -> myimageview.setimagebitmap(bitmap);

我自己也很喜欢androidobservable.frombroadcast()方法,它允许你创建一个类似broadcastreceiver的observable对象。下面的例子展示了如何在网络变化的时候被通知到:

intentfilter filter = new intentfilter(connectivitymanager.connectivity_action);
androidobservable.frombroadcast(context, filter)
  .subscribe(intent -> handleconnectivitychange(intent));

最后要介绍的是viewobservable,使用它可以给view添加了一些绑定。如果你想在每次点击view的时候都收到一个事件,可以使用viewobservable.clicks(),或者你想监听textview的内容变化,可以使用viewobservable.text()。

viewobservable.clicks(mcardnameedittext, false)
  .subscribe(view -> handleclick(view));

retrofit

大名鼎鼎的retrofit库内置了对rxjava的支持(官方下载页http://square.github.io/retrofit/#download)。通常调用发可以通过使用一个callback对象来获取异步的结果:

@get("/user/{id}/photo")
void getuserphoto(@path("id") int id, callback<photo> cb);

使用rxjava,你可以直接返回一个observable对象。

@get("/user/{id}/photo")
observable<photo> getuserphoto(@path("id") int id);

现在你可以随意使用observable对象了。你不仅可以获取数据,还可以进行变换。
retrofit对observable的支持使得它可以很简单的将多个rest请求结合起来。比如我们有一个请求是获取照片的,还有一个请求是获取元数据的,我们就可以将这两个请求并发的发出,并且等待两个结果都返回之后再做处理:

observable.zip(
  service.getuserphoto(id),
  service.getphotometadata(id),
  (photo, metadata) -> createphotowithdata(photo, metadata))
  .subscribe(photowithdata -> showphoto(photowithdata));

在第二篇里我展示过一个类似的例子(使用flatmap())。这里我只是想展示以下使用rxjava+retrofit可以多么简单地组合多个rest请求。

遗留代码,运行极慢的代码

retrofit可以返回observable对象,但是如果你使用的别的库并不支持这样怎么办?或者说一个内部的内码,你想把他们转换成observable的?有什么简单的办法没?

绝大多数时候observable.just() 和 observable.from() 能够帮助你从遗留代码中创建 observable 对象:

private object oldmethod() { ... }

public observable<object> newmethod() {
  return observable.just(oldmethod());
}

上面的例子中如果oldmethod()足够快是没有什么问题的,但是如果很慢呢?调用oldmethod()将会阻塞住他所在的线程。
为了解决这个问题,可以参考我一直使用的方法–使用defer()来包装缓慢的代码:

private object slowblockingmethod() { ... }

public observable<object> newmethod() {
  return observable.defer(() -> observable.just(slowblockingmethod()));
}

现在,newmethod()的调用不会阻塞了,除非你订阅返回的observable对象。

生命周期

我把最难的不分留在了最后。如何处理activity的生命周期?主要就是两个问题:
1.在configuration改变(比如转屏)之后继续之前的subscription。

比如你使用retrofit发出了一个rest请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?

2.observable持有context导致的内存泄露

这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果observable没有及时结束,内存占用就会越来越大。
不幸的是,没有银弹来解决这两个问题,但是这里有一些指导方案你可以参考。

第一个问题的解决方案就是使用rxjava内置的缓存机制,这样你就可以对同一个observable对象执行unsubscribe/resubscribe,却不用重复运行得到observable的代码。cache() (或者 replay())会继续执行网络请求(甚至你调用了unsubscribe也不会停止)。这就是说你可以在activity重新创建的时候从cache()的返回值中创建一个新的observable对象。

observable<photo> request = service.getuserphoto(id).cache();
subscription sub = request.subscribe(photo -> handleuserphoto(photo));

// ...when the activity is being recreated...
sub.unsubscribe();

// ...once the activity is recreated...
request.subscribe(photo -> handleuserphoto(photo));

注意,两次sub是使用的同一个缓存的请求。当然在哪里去存储请求的结果还是要你自己来做,和所有其他的生命周期相关的解决方案一延虎,必须在生命周期外的某个地方存储。(retained fragment或者单例等等)。

第二个问题的解决方案就是在生命周期的某个时刻取消订阅。一个很常见的模式就是使用compositesubscription来持有所有的subscriptions,然后在ondestroy()或者ondestroyview()里取消所有的订阅。

private compositesubscription mcompositesubscription
  = new compositesubscription();

private void dosomething() {
  mcompositesubscription.add(
    androidobservable.bindactivity(this, observable.just("hello, world!"))
    .subscribe(s -> system.out.println(s)));
}

@override
protected void ondestroy() {
  super.ondestroy();

  mcompositesubscription.unsubscribe();
}

你可以在activity/fragment的基类里创建一个compositesubscription对象,在子类中使用它。

注意! 一旦你调用了 compositesubscription.unsubscribe(),这个compositesubscription对象就不可用了, 如果你还想使用compositesubscription,就必须在创建一个新的对象了。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网