Rxjava2.x
- 微软的一个函数库,Rx是一个编程模型,模板是提供一致的编程接口,帮助开发者更方便的处理异步数据流,现在Rx已经支持几乎全部的流行编程语言。比较流行的有Rxjava,RxJs,Rx.NET,社区网站:http://reactivex.io/
- Rx使用观察者模式
- 使用观察者模式监听:RX可以订阅任何可观察的数据流并且执行操作
- 组合:RX使用查询式的操作符和变换数据流
- 创建:Rx可以方便的创建事件流和数据流
- 简化代码
- 函数式风格:对可观察数据流使用无副作用的输入输出函数,避免程序中错综复杂的状态
- 简化代码:Rx的操作符通常可以将复杂难题简单化为很少的代码
- 异步错误处理: 传统的try/catch没有办法处理异步计算,Rx提供了合适的错误处理机制
- 轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层线程同步和各种并发问题。
- jar包maven仓库地址
<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId><version>2.2.6</version>
</dependency>
-
一个简单的例子:
public void myTestObservable(){Observable.fromIterable(Lists.newArrayList(1,2,3,4,5)).filter(integer -> {return integer > 2;}).subscribe(integer -> {System.out.println(Thread.currentThread().getName() + " : "+ integer );});}
- 在Rxjava中,一个实现了Observer接口的对象可以订阅(subscribe)一个Observable类的实例。订阅者(Subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。这种模式简化了并发的操作,因为他不需要阻塞等待Observable发射数据,而是创建一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。
-
Observable
- 如上图是Observable发射数据的一个流程图,
- 时间线 左边 ----右边, 各个不同形状标识Observable上的元素
- 最后垂直线表示Observable成功执行
- 向下虚线箭标识数据被发射出去
- 盒子表示各种操作符来对对应数据进行处理
- 第二条时间线也是一个Observable,不过是转换之后的
- 当转换时候出现错误将会并不会终止,他会用一个error事件替代这个位置
-
Subscribe
- 观察者通过SubScribe操作关联Observable
-
Observer
- 观察者,决定了事件触发的时候将有怎么样的行为
void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete();
- onNext事件,被观察者每发射一个数据都会调onNext事件,相当于异步任务中的回调接口,相当于Feature的get获取,只不过onNext不是阻塞的就是一个哨兵模式,每次发射数据,获取立即获取对应结果,然后执行之后的逻辑
- onCompleted事件,表示事件队列完成,Rxjava不仅把每一个事件单独处理,还会把他看做一个队列,Rxjava规定,当不会再发射新的元素触发onNext事件时候,需要触发onCompleted事件作为结束标志。
- onError事件,事件队列处理中出现异常时候,onError会被触发,可以在onError中统一处理异常情况
- onSubScribe事件,表示元素开始发射,相当于所有元素执行之前的一个预处理位置。
-
Schedulers
- 默认情况下Observable和Observer执行过程是在同一个线程执行如上面最简单例子,如果想要切换线程在不同线程执行可以用SubscribeOn(),observeOn()。
- Rxjava提供了几种线调度器
调度器类型 | 效果 |
---|---|
Schedulers.newThread(); | 为每一个任务创建一个新线程 |
Schedulers.computation(); | 用于计算任务,如时间循环和回调处理,不要用于IO操作,默认线程数等于处理器数量 |
Schedulers.io(); | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对应普通计算任务,一般用上面这个,Schedulers.io默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。 |
Schedulers.single(); | 拥有一个线程单例的线程池,所有任务都在这一个线程中执行,当线程中有任务执行时候,他的任务将会按照先进先出的顺序依次执行。 |
Schedulers.trampoline(); | Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. 在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。 |
- 来一个完整的例子来解释一下线程切换:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (Integer integer : intList) {System.out.println(Thread.currentThread().getName() + " : send");emitter.onNext(integer);}emitter.onComplete();}}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.newThread()).flatMap(new Function<Integer, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Integer integer) throws Exception {return Observable.just(integer).subscribeOn(Schedulers.computation()).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {System.out.println(Thread.currentThread().getName() + ": filter one integer: "+ integer);return integer > 2;}});}}).observeOn(Schedulers.io()).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println(Thread.currentThread().getName()+ " : onSubscribe");}});
这个代码看起来比较长,也可以这么写:
Observable.create(emitter -> {intList.forEach(intTemp -> emitter.onNext(intTemp));emitter.onComplete();}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).flatMap(intStr -> Observable.just(intStr).subscribeOn(Schedulers.computation()).filter(filterInt -> Integer.valueOf(filterInt.toString()) > 2)).observeOn(Schedulers.computation()).subscribe(intTemp -> System.out.println(intTemp));
- 第一个subscribeOn指定被观察对象发射的线程,使用的computation模型
- 第一个observeOn指定之后的flatMap操作符切换到另外线程中执行
- 最后的observeOn指定观察者哨兵消费数据的线程,会有如下结果
- Observable的这种异步切换线程方式从整体流程来看还是同步的方式,他必须先Observable发射数据-----操作符change-----消费数据并不是每次发射一个数据的同时进行change接着消费的并行实现,因此Rxjava提供了另外一个并行的方式,如下案例
public static void flowableDemo() throws InterruptedException {Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 1; i < 100; i++) {System.out.println(Thread.currentThread().getName() + " 发射数据");emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {System.out.println(Thread.currentThread().getName() + " 过滤发射数据");return integer > 0;}}).observeOn(Schedulers.newThread()).subscribe(new Subscriber<Object>() {public void onSubscribe(Subscription subscription) {System.out.println("取出n条数据");subscription.request(3);}public void onNext(Object o) {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());System.out.println("消费数据:" + o);}public void onError(Throwable throwable) {System.out.println(throwable.getMessage());}public void onComplete() {System.out.println("onComplete");}});}
- 用Flowable不仅仅是对每个模块进行了线程的切换,在同时也是并行的执行了整个流程
- 我觉得在异步编程方面Rxjava的确比原始的Thread ,Runnable这类操作要方便的多,通过几个操作符就可以达到异步的目的,这也是Rxjava的一个优势,而在我们工作中我们一般都是用架构组的异步框架也可以做到很优雅的异步编程,比起Rxjava而言只是会多创建一个异步类而已,那么我们来对比一下两种异步操作,我用之前的批量关注接口做测试
- 原来的异步方式:
public void batchFollowPush(UserInfo userInfo, List<UserInfo> objectUserInfos, Integer source) {List<Boolean> batchFollowResult = new ArrayList<>();Future<Boolean> pushFuture = null;for (UserInfo objectUserInfo : objectUserInfos) {try {pushFuture = (Future<Boolean>)pushAsync.asyncFollowAndStoreMoment(userInfo, objectUserInfo, source);batchFollowResult.add(pushFuture.get());} catch (Exception e) {if (pushFuture != null) {pushFuture.cancel(true);}throw new RuntimeException(e);}}log.info("batchFollow result:{}", JSON.toJSONString(batchFollowResult));}//异步类中方法public Object asyncFollowAndStoreMoment(UserInfo userInfo, UserInfo objectUserInfo, Integer source) {// 关注交互限制Optional<InteractError> interactErrorOptional = personFacade.interactRestrict(userInfo, objectUserInfo);if (interactErrorOptional.isPresent()) {return Boolean.FALSE;}int result = wooerFacade.addOrUpdatePush2(userInfo.getMemberID(), objectUserInfo.getMemberID(), 1, source, HeadUaUtils.getPlatform());Boolean isTrue = result > 0;if (isTrue == null || !isTrue) {return Boolean.FALSE;}limitedPushFacade.followPush(userInfo, objectUserInfo);MomentListStoreParam momentListStoreParam =new MomentListStoreParam(Arrays.asList(Long.valueOf(objectUserInfo.getMemberID())),userInfo.getMemberID().longValue());log.info("batchFollow afterFilter param:{}", JSON.toJSONString(momentListStoreParam));momentManagerFacade.storeMomentMongoByMomentId(momentListStoreParam);return Boolean.TRUE;}
- RX方式
public IResponse batchFollowRx(BatchFollowForm batchFollowForm, UserInfo myUserInfo) {log.info("batchFollow param:{}", JSON.toJSONString(batchFollowForm));BusinessAssert.isNotNull(batchFollowForm, CommonError.ARGS_EMPTY);List<Long> objectIDs = batchFollowForm.getObjectIDs();List<UserInfo> objectUserInfo = coreUserService.getList(objectIDs, UserInfo.class);Flowable.create(new FlowableOnSubscribe<UserInfo>() {@Overridepublic void subscribe(FlowableEmitter<UserInfo> emitter) throws Exception {for (UserInfo info : objectUserInfo) {emitter.onNext(info);}}}, BackpressureStrategy.ERROR).parallel().runOn(Schedulers.io()).filter(userInfo -> {Optional<InteractError> interactErrorOptional = personFacade.interactRestrict(myUserInfo, userInfo);if (interactErrorOptional.isPresent()) {return Boolean.FALSE;}Boolean isTrue =wooerFacade.addOrUpdatePush2(myUserInfo.getMemberID(), userInfo.getMemberID(),1, batchFollowForm.getSource(), HeadUaUtils.getPlatform()) > 0;if (isTrue == null || !isTrue) {return Boolean.FALSE;}return Boolean.TRUE;}).runOn(Schedulers.computation()).sequential().subscribe(new Consumer<UserInfo>() {@Overridepublic void accept(UserInfo userInfo) throws Exception {limitedPushFacade.followPush(myUserInfo, userInfo);MomentListStoreParam momentListStoreParam =new MomentListStoreParam(Arrays.asList(Long.valueOf(userInfo.getMemberID())),userInfo.getMemberID().longValue());log.info("batchFollow afterFilter param:{}", JSON.toJSONString(momentListStoreParam));momentManagerFacade.storeMomentMongoByMomentId(momentListStoreParam);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {log.error("batch follow error exception:{}", throwable.getMessage());}});return MessageResponse.success(FOLLOW_SUCCESS);}
-
测试批量关注接口,相同容器环境在同一个namespace上,并且相同条件的男性账号,关注同一批13个异性用户:
-
普通callback异步方式:
-
Rxjava方式
-
-
backPressure
- 以上的例子中每次都用到了BackpressureStrategy操作符这个是Rxjava2.x后为了解决背压问题的一个操作符,所谓背压,即异步的情况下发射数据的速度大于消费数据的速度带来的问题。
- BackPressure现象说明:Flowable无限的生产事件,但是SubScribe消费的速度很慢,导致事件堆积,当堆积到一定程度将会造成OOM,我们模拟一下这种情况。
public static void oomDemo(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i=0;;i++){System.out.println(Thread.currentThread().getName() + " onNext : "+ i);emitter.onNext(i);}}}).subscribeOn(Schedulers.io()).observeOn(Schedulers.single()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " consumer : "+ integer);}});}
- 让发射数据在多个线程中执行,让消费数据在一个线程中执行并且每两秒才消费一个,这样会导致发射的数据不断的累积在内存中,最终可能会导致oom,我们通过内存信息来看他执行之后一段时间的堆内存信息
- PSYoiungGen 年轻态区,总共1024k,使用了511k
- eden区域是新对象区,已经被沾满
- from和to区域 大学是一样,在gc时候会遍历from或者to区域,将不能清除的拷贝到另外一个区,然后清除本区域留下的,然后循环
- paroldGen 老年代区域也已经被占满
- 这种状态下Observable因内存不够已经oom,停止运行了,只有消费线程在消费数据。
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.OutOfMemoryError: GC overhead limit exceededat io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:69)at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
- 我们用一个Flowable的例子来看他如何解决这个oom的问题:
public static void oomDemoFix(){Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i=0;;i++){System.out.println(Thread.currentThread().getName() + " onNext : "+ i);emitter.onNext(i);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(50);}@Overridepublic void onNext(Integer integer) {System.out.println(Thread.currentThread().getName() + "消费数据: "+ integer);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}
- 我们在创建Flowable的时候增加了一个参数 BackpressureStrategy.ERROR,这个参数指定了在处理背压问题时候执行的一个策略,当内存满时候接下来继续发射的数据将会抛出MissingBackpressureException 异常,其余的策略稍等介绍
- 还有另外一个不同 onSubscribe中传递的不是Disposable变成了Subscription,而且还执行了这句代码subscription.request(50)。因为在Flowable中采用了一个新的思路,响应获取发射数据的方法来解决流量不均匀而造成的oom的问题,也就是我要消费多少我就取多少,这里我们从发射数据中取出了50条。其他的还是会存储在内存中。
Flowable中主要有这几个策略
- BackpressureStrategy.ERROR:如果缓存池(默认128)溢出会立刻抛异常MissingBackpressureexception
- BackpressureStrategy.BUFFER:RxJava中有一个默认能够存储128个事件的缓存池,可以调节大小,生产者生产事件,并且将处理不了的事件缓存。(谨慎使用,因为依赖对消费者消费能力,耗内存)
- BackpressureStrategy.DROP:消费不了就丢弃,比如先生产200个,并没有消费,而是在缓存,然后消费者request(200),但是缓存只有128个,所以其他的事件都丢弃了。
- BackpressureStrategy.LATEST:和DROP功能基本一致,处理不了丢弃,区别在于LATEST能够让消费者接收到生产者产生的最后一个事件。
- BackpressureStrategy.MISSING:生产的事件没有进行缓存和丢弃,下游接收到的事件必须进行消费或者处理!
感觉这些都是缓兵之计,能否按照我的消费能力来发射数据呢,这样才完美。
- Rxjava2.x后有一个FlowableEmitter 这个接口:
public static void fix(Flowable flowable){flowable.observeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(20);}@Overridepublic void onNext(Integer integer) {System.out.println("消费数据: "+ 100);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}public static Flowable flowableEmitterDemo(){Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {while (emitter.requested() > 0){System.out.println("下游处理能力:"+ emitter.requested());emitter.onNext(100);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());return flowable;}
- 我可以在Flowable发射数据之前通过requested来获取下游Subscriber的消费能力,依据这个来进行数据的发射,这样既可以控制发射以及消费数据的速度,也能够避免数据的丢失
现在我们看下开始时候从官网摘抄的Rx的几个优点:
-
首先函数式风格,这种编程模式和常规的方式比较的确简化了不少代码比如第一个案例,但是感觉我们用stream表达式加上lambda也可以达到这种效果,而且对于map,flatmap,filter等这些操作符对于没有函数式编程的人来说并不好理解不觉得这是优势
-
简化代码,这点主要体现在异步编程模式时候,不管和我们java中的异步编程用的Thread和Runnable相比,还是我们框架中的异步编程框架比较的确代码都更加简单,只需要通过几个异步线程切换的操作符便可以达到目的,但是缺点也很明显,需要引入新的jar,新的技术对不熟悉这块技术的同事并不友好有一定学习成本不利于维护。
-
异步错误处理,轻松使用并发:通过onError捕获异常信息,通过操作法切换线程,的确也是优势所在。
-
在之前的实践中还有这种业务模型下使用Rxjava会更具优势,当我们需要从多个网络环境来获取各自信息从中筛选出符合我们预期的并对其进行组合,我们可以通过Rxjava的丰富的操作符以及异步操作来完成。来一个简单的例子
public static Flowable getIOData1(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 0; i < 10; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 2);}public static Flowable getIOData2(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 10; i < 21; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 12);}public static Flowable getIOData3(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 20; i < 30; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 22);}public static Flowable getIOData4(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 30; i < 41; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 32);}public static void mergeDemo(){Flowable.merge(getIOData1(), getIOData2(), getIOData3(), getIOData4()).map(temp -> "onNext"+ temp).flatMap(new Function() {@Overridepublic Object apply(@NonNull Object o) throws Exception {return Flowable.just(o).subscribeOn(Schedulers.io());}}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.computation()).subscribe(new Subscriber() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(Long.MAX_VALUE);}@Overridepublic void onNext(Object o) {System.out.println("onNext: "+ o);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}
- 我们定义N个Flowable用异步方式分别请求各个第三方接口来获取对应的数据并且用filter过滤出我们需要的信息,然后通过merge操作法将所有获取到的数据组合到同一个Flowable中,进行统一的封装处理以及之后的一些业务操作。
- 如果我们用传统的方式,我们不得不定义N个变量来获取四个异步线程数据,然后等都获取完毕之后,在分别对四个变量中保存的信息进行筛选,之后通过逻辑操作合并到一起,和rxjava相比显然要逊色很多。
- 这种方式就是Flowable通过内置操作符对自身发射的数据在空间维度上重新组织,或者和其他Flowable发射的数据一起在空间维度上进行重新组织,是的观察者的逻辑变得更加的简单直观因为直接看操作符就能知道具体做了什么,不需要关心数据从哪里来这部分由Flowable屏蔽了,从而使得观察者更加专注于下游逻辑。
RxJava的响应式优势只有在异步逻辑占主导时才会体现出来.
- wiki地址:https://github.com/ReactiveX/RxJava/wiki
- reactivex官网:http://reactivex.io/