当前位置: 移动技术网 > IT编程>开发语言>Java > Rxjava功能操作符的使用方法详解

Rxjava功能操作符的使用方法详解

2019年07月19日  | 移动技术网IT编程  | 我要评论

rxjava功能个人感觉很好用,里面的一些操作符很方便,rxjava有:被观察者,观察者,订阅者,

被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据

下面把rxjava常用的模型代码列出来,还有一些操作符的运用:

依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// because rxandroid releases are few and far between, it is recommended you also
// explicitly depend on rxjava's latest version for bug fixes and new features.
  compile 'io.reactivex.rxjava2:rxjava:2.1.5'

这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。

compile 'com.alibaba:fastjson:1.2.39'
import android.os.bundle;
import android.support.v7.app.appcompatactivity;
import android.view.view;
import android.widget.textview;
 
import com.alibaba.fastjson.jsonobject;
 
import java.io.ioexception;
import java.util.concurrent.timeunit;
 
import io.reactivex.backpressurestrategy;
import io.reactivex.flowable;
import io.reactivex.flowableemitter;
import io.reactivex.flowableonsubscribe;
import io.reactivex.observable;
import io.reactivex.observableemitter;
import io.reactivex.observableonsubscribe;
import io.reactivex.observer;
import io.reactivex.android.schedulers.androidschedulers;
import io.reactivex.annotations.nonnull;
import io.reactivex.disposables.disposable;
import io.reactivex.functions.bifunction;
import io.reactivex.functions.consumer;
import io.reactivex.functions.function;
import io.reactivex.schedulers.schedulers;
import okhttp3.call;
import okhttp3.callback;
import okhttp3.okhttpclient;
import okhttp3.request;
import okhttp3.response;
 
public class mainactivity extends appcompatactivity {
 
  private textview name;
 
  @override
  protected void oncreate(bundle savedinstancestate) {
    super.oncreate(savedinstancestate);
    setcontentview(r.layout.activity_main);
 
    name = (textview) findviewbyid(r.id.name);
    //用来调用下面的方法,监听。
    name.setonclicklistener(new view.onclicklistener() {
      @override
      public void onclick(view v) {
 
        interval();
      }
    });
  }
 
  //例1:observer
  public void observer() {
    //观察者
    observer<string> observer = new observer<string>() {
      @override
      public void onsubscribe(@nonnull disposable d) {
 
      }
      @override
      public void onnext(@nonnull string s) {
        //接收从被观察者中返回的数据
        system.out.println("onnext :" + s);
      }
      @override
      public void onerror(@nonnull throwable e) {
 
      }
      @override
      public void oncomplete() {
 
      }
    };
    //被观察者
    observable<string> observable = new observable<string>() {
      @override
      protected void subscribeactual(observer<!--? super string--> observer) {
        observer.onnext("11111");
        observer.onnext("22222");
        observer.oncomplete();
      }
    };
    //产生了订阅
    observable.subscribe(observer);
  }
 
  //例2:flowable
  private void flowable(){
    //被观察者
    flowable.create(new flowableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull flowableemitter<string> e) throws exception {
        for (int i = 0; i < 100; i++) {
          e.onnext(i+"");
        }
      }
      //背压的策略,buffer缓冲区        观察者
      //背压一共给了五种策略
      // buffer、
      // drop、打印前128个,后面的删除
      // error、
      // latest、打印前128个和最后一个,其余删除
      // missing
      //这里的策略若不是buffer 那么,会出现著名的:missingbackpressureexception错误
    }, backpressurestrategy.buffer).subscribe(new consumer<string>() {
      @override
      public void accept(string s) throws exception {
        system.out.println("subscribe accept"+s);
        thread.sleep(1000);
      }
    });
  }
 
  //例3:线程调度器 scheduler
  public void flowable1(){
    flowable.create(new flowableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull flowableemitter<string> e) throws exception {
        for (int i = 0; i < 100; i++) {
          //输出在哪个线程
          system.out.println("subscribe thread.currentthread.getname = " + thread.currentthread().getname());
          e.onnext(i+"");
        }
      }
    },backpressurestrategy.buffer)
        //被观察者一般放在子线程
        .subscribeon(schedulers.io())
        //观察者一般放在主线程
        .observeon(androidschedulers.mainthread())
        .subscribe(new consumer<string>() {
          @override
          public void accept(string s) throws exception {
            system.out.println("s"+ s);
            thread.sleep(100);
            //输出在哪个线程
            system.out.println("subscribe thread.currentthread.getname = " + thread.currentthread().getname());
          }
        });
  }
 
 
  //例4:http请求网络,map转化器,fastjson解析器
  public void map1(){
    observable.create(new observableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull final observableemitter<string> e) throws exception {
        okhttpclient client = new okhttpclient();
        request request = new request.builder()
            .url("https://qhb.2dyt.com/bwei/login")
            .build();
        client.newcall(request).enqueue(new callback() {
          @override
          public void onfailure(call call, ioexception e) {
 
          }
 
          @override
          public void onresponse(call call, response response) throws ioexception {
            string result = response.body().string();
            e.onnext(result);
          }
        });
      }
    })
        //map转换器 flatmap(无序),concatmap(有序)
        .map(new function<string, bean="">() {
      @override
      public bean apply(@nonnull string s) throws exception {
        //用fastjson来解析数据
        return jsonobject.parseobject(s,bean.class);
      }
    }).subscribe(new consumer<bean>() {
      @override
      public void accept(bean bean) throws exception {
        system.out.println("bean = "+ bean.tostring() );
      }
    });
  }
 
  //常见rxjava操作符
  //例 定时发送消息
  public void interval(){
    observable.interval(2,1, timeunit.seconds)
        .take(10)
        .subscribe(new consumer<long>() {
          @override
          public void accept(long along) throws exception {
            system.out.println("along = " + along);
          }
        });
  }
 
 
  //例 zip字符串合并
  public void zip(){
    observable observable1 = observable.create(new observableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull observableemitter<string> e) throws exception {
        e.onnext("1");
        e.onnext("2");
        e.onnext("3");
        e.onnext("4");
        e.oncomplete();
 
      }
    });
    observable observable2 = observable.create(new observableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull observableemitter<string> e) throws exception {
        e.onnext("a");
        e.onnext("b");
        e.onnext("c");
        e.onnext("d");
        e.oncomplete();
      }
    });
 
    observable.zip(observable1, observable2, new bifunction<string,string,string>() {
      @override
      public string apply(@nonnull string o, @nonnull string o2) throws exception {
        return o + o2;
      }
    }).subscribe(new consumer<string>() {
      @override
      public void accept(string o) throws exception {
        system.out.println("o"+ o);
      }
    });
  }

总结

以上就是本文关于rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:javaweb应用使用限流处理大量的并发请求详解、、java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网