Rxjava retryWhen and repeatWhen

retry

Observable发生错误时接收到onError事件,重新发射数据。可以拦截·Throwable 和 Exception

重载方法如下:

	// 一直错误,一直重试
public final Observable<T> retry() {return retry(Long.MAX_VALUE, Functions.alwaysTrue());
}
// 最大重试的次数
public final Observable<T> retry(long times) {return retry(times, Functions.alwaysTrue());
}
// 重试条件
public final Observable<T> retry(Predicate<? super Throwable> predicate) {return retry(Long.MAX_VALUE, predicate);
}
// 重试次数和条件
public final Observable<T> retry(long times, Predicate<? super Throwable> predicate) {if (times < 0) {throw new IllegalArgumentException("times >= 0 required but it was " + times);}ObjectHelper.requireNonNull(predicate, "predicate is null");return RxJavaPlugins.onAssembly(new ObservableRetryPredicate<T>(this, times, predicate));
}public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) {ObjectHelper.requireNonNull(predicate, "predicate is null");return RxJavaPlugins.onAssembly(new ObservableRetryBiPredicate<T>(this, predicate));
}

Repeat

无条件地、重复发送 被观察者事件.,具备重载方法,可设置重复创建次数

public final Observable<T> repeat() {return repeat(Long.MAX_VALUE);}public final Observable<T> repeat(long times) {if (times < 0) {throw new IllegalArgumentException("times >= 0 required but it was " + times);}if (times == 0) {return empty();}return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, times));
}

RetryWhen

遇到错误时,将发生的错误传递给一个新的被观察者 Observable, 并根据新被观察者发送的事件,决定是否需要重新订阅原始被观察者Observable & 发送事件

分为两种情况

  1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不重新发送事件:该异常错误信息可在观察者中的onError()中获得
  2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件。
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onError(new Exception("error happen."));emitter.onNext(4);}})// 上游遇到error时回调.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {// 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型// 返回Observable<?> = 新的被观察者 Observable(任意类型)// throwableObservable 必须被处理,不然只会发送上游发送error事件return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {/*** 1. emit error. 不会重新发射数据。异常传递到观察的的onError中* 10:54:26.148 com...mple.test_android  D  接收到了事件1* 10:54:26.148 com...mple.test_android  D  接收到了事件2* 10:54:26.148 com...mple.test_android  D  接收到了事件3* 10:54:26.148 com...mple.test_android  D  对Error事件作出响应java.lang.Throwable: retry stop!*///  return Observable.error(new Throwable("retry stop!"));/*** 2. emit onNext* 原始的Observable则重新发送数据* 10:57:22.759 com...mple.test_android  D  接收到了事件1* 10:57:22.759 com...mple.test_android  D  接收到了事件2* 10:57:22.759 com...mple.test_android  D  接收到了事件3* 10:57:22.759 com...mple.test_android  D  接收到了事件1* 10:57:22.759 com...mple.test_android  D  接收到了事件2* 10:57:22.759 com...mple.test_android  D  接收到了事件3* 10:57:22.759 com...mple.test_android  D  接收到了事件1* 10:57:22.759 com...mple.test_android  D  接收到了事件2* 10:57:22.759 com...mple.test_android  D  接收到了事件3*/return Observable.just(true);}});}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer value) {Log.d(TAG, "接收到了事件" + value);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "对Error事件作出响应" + e.toString());// 获取异常错误信息}@Overridepublic void onComplete() {Log.d(TAG, "对Complete事件作出响应");}});

RepeatWhen

有条件地、重复发送 被观察者事件。 将原始 Observable 停止发送事件的标识(Complete() / Error())。转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable

  1. 若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable
  2. 若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable
Observable.just(1, 2, 3, 4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {@Override// 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Object o) throws Exception {/***  1. 发送onComplete事件,不会重新发送原来的数据 但不会回调观察者的onComplete()* 11:03:43.908 com...mple.test_android  D  开始采用subscribe连接* 11:03:43.908 com...mple.test_android  D  接收到了事件1* 11:03:43.908 com...mple.test_android  D  接收到了事件2* 11:03:43.908 com...mple.test_android  D  接收到了事件3* 11:03:43.908 com...mple.test_android  D  接收到了事件4 *///return Observable.empty();/*** 2. 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。*  * 11:05:38.118 com...mple.test_android  D  开始采用subscribe连接*  * 11:05:38.119 com...mple.test_android  D  接收到了事件1*  * 11:05:38.119 com...mple.test_android  D  接收到了事件2*  * 11:05:38.119 com...mple.test_android  D  接收到了事件3*  * 11:05:38.119 com...mple.test_android  D  接收到了事件4*  * 11:05:38.121 com...mple.test_android  D  对Error事件作出响应:java.lang.Throwable: repeat when stop!*///return Observable.error(new Throwable("repeat when stop!"));/*** 3.若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable* 11:07:23.876 com...mple.test_android  D  开始采用subscribe连接* 11:07:23.877 com...mple.test_android  D  接收到了事件1* 11:07:23.877 com...mple.test_android  D  接收到了事件2* 11:07:23.877 com...mple.test_android  D  接收到了事件3* 11:07:23.877 com...mple.test_android  D  接收到了事件4* 11:07:23.877 com...mple.test_android  D  接收到了事件1* 11:07:23.877 com...mple.test_android  D  接收到了事件2* 11:07:23.877 com...mple.test_android  D  接收到了事件3* 11:07:23.877 com...mple.test_android  D  接收到了事件4* 11:07:23.877 com...mple.test_android  D  接收到了事件1* 11:07:23.877 com...mple.test_android  D  接收到了事件2* 11:07:23.877 com...mple.test_android  D  接收到了事件3* 11:07:23.877 com...mple.test_android  D  接收到了事件4*/return Observable.just(1);}});}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "开始采用subscribe连接");}@Overridepublic void onNext(Integer value) {Log.d(TAG, "接收到了事件" + value);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "对Error事件作出响应:" + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "对Complete事件作出响应");}});

RetryWhen 和 RepeatWhen组合完成轮询请求

private int i = 0;
public void repeatAndRetryWhen() {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);//emitter.onError(new Throwable("error happened!!")); // error走retryWhenemitter.onNext(3);emitter.onComplete(); // 顺利完成走repeatWhen}}).repeat().retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {// 超出最大请求次数或者这个throwable是结束条件,发送onError传递到下游if (i > 4) {return Observable.error(new Throwable("stop retry!"));}// 延迟5s后进行重试return Observable.just(1).delay(5, TimeUnit.SECONDS);}});}}).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {// 正常结束后10s开始轮询return objectObservable.delay(10, TimeUnit.SECONDS);}}).doFinally(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "Finally!!");}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "开始采用subscribe连接");}@Overridepublic void onNext(Integer value) {i++;Log.d(TAG, "接收到了事件" + value);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "对Error事件作出响应:" + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "对Complete事件作出响应");}});}12:53:10.226 com...mple.test_android  D  开始采用subscribe连接
12:53:10.238 com...mple.test_android  D  接收到了事件1
12:53:15.242 com...mple.test_android  D  接收到了事件1
12:53:20.245 com...mple.test_android  D  接收到了事件1
12:53:25.248 com...mple.test_android  D  接收到了事件1
12:53:30.253 com...mple.test_android  D  接收到了事件1
12:53:30.281 com...mple.test_android  DError事件作出响应:java.lang.Throwable: stop retry!
12:53:30.281 com...mple.test_android  D  Finally!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/63869.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM-内存溢出的原因、CPU占满的原因

1.内存溢出的原因 OOM的排查思路_oom排查_java排坑日记的博客-CSDN博客 每个进程的内存&#xff08;限制&#xff0c;譬如2G&#xff09;最大堆容量最大方法区容量程序计数器虚拟机栈和本地方法栈。多线程下每个线程栈越大&#xff0c;越容易OOM. 1.堆内存溢出&#xff08;OO…

网易24届内推

【网易】2024届网易互联网秋季校园招聘内推开始啦&#xff01;给你分享我的专属内推邀请函&#xff1a;https://bole.campus.163.com/campus/home?projectId55&type99&isShare1&boleId7b842acc7c2b42db&boleType2&signatured5f2a3dc23bed70777a8be1a14b49…

opendds qos策略之HISTORY

HISTORY策略的含义 顾名思义&#xff0c;该策略会影响历史样本数据的保存策略&#xff0c;即历史样本数据保存策略。这个策略是针对instance的&#xff0c;一个datawriter或者datareader可以根据key成员为设置多个instance&#xff0c;每个instance也可以独立的保存多个历史样…

经管博士必备基础【12】包络定理

当我们知道一个函数的最优解时&#xff0c;我们要求解这一个函数的值函数关于函数中某一个参数的导数&#xff0c;那么就可以使用包络定理。 1. 无约束条件下的包络定理 函数在其极值点处对一个参数&#xff08;参数不是自变量&#xff09;取偏导数的结果&#xff0c;等价于这…

软件工程(十七) 行为型设计模式(三)

1、观察者模式 简要说明 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并自动更新 速记关键字 联动,广播消息 类图如下 基于上面的类图,我们来实现一个监听器。类图中的Subject对应我们的被观察对象接口(IObservable),…

【人工智能】—_神经网络、M-P_神经元模型、激活函数、神经网络结构、学习网络参数、代价定义、总代价

M-P_神经元模型、激活函数、神经网络结构、学习网络参数、代价定义 文章目录 M-P_神经元模型、激活函数、神经网络结构、学习网络参数、代价定义 M-P 神经元模型激活函数(Activation function)神经网络结构举例训练神经网络学习网络参数代价定义均方误差交叉熵&#xff08;Cros…

Qt +VTK+Cmake 编译和环境配置(第三篇,高级篇, 已解决)

上篇说了&#xff0c;Cmake 虽然可以成功的build&#xff0c;但是大部分人都选择的是VS编译&#xff0c;没有人选择Qt自带的编译器编译。 在build文件夹 shift右键 进入cmd串口&#xff0c;执行mingw32-make mingw32-make 报错&#xff01;&#xff01;&#xff01;&#x…

前后端分离项目,整合成jar包,刷新404或空白页,解决方法

问题解决 1、注销遇到404&#xff0c;或刷新遇到404 # 添加错误跳转 Component public class ErrorConfig implements ErrorPageRegistrar {Overridepublic void registerErrorPages(ErrorPageRegistry registry) {ErrorPage error404Page new ErrorPage(HttpStatus.NOT_FOU…

VSCode下载、安装及配置、调试的一些过程理解

第一步先下载了vscode&#xff0c;官方地址为&#xff1a;https://code.visualstudio.com/Download 第二步安装vscode&#xff0c;安装环境是win10&#xff0c;安装基本上就是一步步默认即可。 第三步汉化vscode&#xff0c;这一步就是去扩展插件里面下载一个中文插件即可&am…

ArcGIS地块面积分割调整工具插件

地块分割调整工具可以实现将选定的图斑按照面积比例或者指定的面积&#xff0c;分割成多个图斑。 各个图斑的面积用逗号分隔&#xff0c;比例分割设置时&#xff0c;用整数表示。 面积分割时&#xff0c;最后一个图斑的面积可以不写&#xff0c;插件可以自动计算图斑的面积&a…

【常用代码】折半插入算法

王道的折半插入排序的代码太难懂了&#xff0c;上网搜了个这个看着比较好用 并且这个算法应该是不稳定的&#xff0c;同样的数值&#xff0c;后进来的插到前面去了。 // C program for implementation of // binary insertion sort #include <stdio.h>// A binary searc…

c++ 学习 之 构造函数的分类和调用类型 深入学习

正文 构造函数是在C中用于创建和初始化对象的特殊函数。构造函数可以根据不同的特性和参数进行分类&#xff0c;以下是一些常见的构造函数分类和详细讲解它们的调用方式&#xff1a; 默认构造函数&#xff1a; 默认构造函数是一个特殊的构造函数&#xff0c;它没有参数&#x…

day-36 代码随想录算法训练营(19)part05

435.无重叠区间 思路&#xff1a;首先对数组排序&#xff0c;只需要关注重叠区间就行&#xff0c;有重叠时计数1&#xff0c;然后更新当前右边界为重叠区间中的最小右边界。 763.划分字母区间 思路&#xff1a;记录每一个字母的最远位置&#xff0c;然后从头开始遍历&#xf…

Java如何发起http的get请求的实现

加哥最近做第三方接口开发&#xff0c;对方提供的是get方式的http请求&#xff0c;下面加哥给大家进行了总结如何用java代码去发送http请求并获取结果。 下面是发送get请求的工具类 1.不要求携带token的方式 public static String getUrl(String tempurl,String bm) {String…

ARM DIY(四)WiFi 调试

文章目录 焊接打开内核编译选项重新编译内核烧录 && 运行 && 测试完善脚本测速手搓天线正式天线 焊接 换个粗点的风枪嘴&#xff0c;让热风覆盖 RTL8823BS 整体模块&#xff0c;最终实现自动归位 焊接 SDIO 接口的上拉电阻以及复位引脚上拉电阻 硬件部分就这…

【C++】map和set

map和set 文章目录 map和set关联式容器setset介绍set的函数测试代码 multiset注意事项测试代码 mapmap介绍map的函数测试代码 关联式容器 前面了解过的vector&#xff0c;list&#xff0c;string等容器都是序列式容器&#xff0c;存储的都是元素本身&#xff0c;底层都是线性的…

七、Kafka-Kraft 模式

目录 7.1 Kafka-Kraft 架构7.2 Kafka-Kraft 集群部署 7.1 Kafka-Kraft 架构 左图为 Kafka 现有架构&#xff0c;元数据在 zookeeper 中&#xff0c;运行时动态选举 controller&#xff0c;由controller 进行 Kafka 集群管理 右图为 kraft 模式架构&#xff08;实验性&#xff…

001图机器学习与图神经网络简介

文章目录 一. 无处不在的图二. 如何对图数据做信息挖掘三. 图神经网络四. 图机器学习常用的编程工具五. 图的可视化工具六. 常见的图数据库七. 图机器学习的应用举例八. 结束语 一. 无处不在的图 一切具有关联关系的数据都可以用图来表示。比如&#xff1a;交通网、知识图谱、…

Vim如何清空文件

在Vim中&#xff0c;可以使用以下命令清空文件内容&#xff1a; 打开需要清空的文件&#xff1a;在终端中输入vim filename打开文件&#xff0c;其中filename是你要编辑的文件名。 进入命令模式&#xff1a;按下键盘上的Esc键&#xff0c;确保处于Vim的命令模式。&#xff08;…

C# excel与DataTable之间的转换

注意&#xff0c;Excel读入DataTable需要使用NPOI包 /// <summary>/// Excel导入成Datable/// </summary>/// <param name"file">导入路径(包含文件名与扩展名)</param>/// <returns></returns>public static DataTable ExcelT…