retry
当Observable
发生错误时接收到onError
事件,重新发射数据。可以拦截·Throwable 和 Exception
。
重载方法如下:
// 一直错误,一直重试
public final Observable<T> retry() {return retry(Long.MAX_VALUE, Functions.alwaysTrue());
}
// 最大重试的次数
public final Observable<T> retry(long times) {return retry(times, Functions.alwaysTrue());
}
// 重试条件
public final Observable<T> retry(Predicate<? super Throwable> predicate) {return retry(Long.MAX_VALUE, predicate);
}
// 重试次数和条件
public final Observable<T> retry(long times, Predicate<? super Throwable> predicate) {if (times < 0) {throw new IllegalArgumentException("times >= 0 required but it was " + times);}ObjectHelper.requireNonNull(predicate, "predicate is null");return RxJavaPlugins.onAssembly(new ObservableRetryPredicate<T>(this, times, predicate));
}public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) {ObjectHelper.requireNonNull(predicate, "predicate is null");return RxJavaPlugins.onAssembly(new ObservableRetryBiPredicate<T>(this, predicate));
}
Repeat
无条件地、重复发送 被观察者事件.,具备重载方法,可设置重复创建次数
public final Observable<T> repeat() {return repeat(Long.MAX_VALUE);}public final Observable<T> repeat(long times) {if (times < 0) {throw new IllegalArgumentException("times >= 0 required but it was " + times);}if (times == 0) {return empty();}return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, times));
}
RetryWhen
遇到错误时,将发生的错误传递给一个新的被观察者 Observable
, 并根据新被观察者发送的事件,决定是否需要重新订阅原始被观察者Observable
& 发送事件
分为两种情况
- 若 新的被观察者
Observable
发送的事件= Error
事件,那么 原始Observable
则不重新发送事件:该异常错误信息可在观察者中的onError()
中获得 - 若 新的被观察者
Observable
发送的事件= Nex
t事件 ,那么原始的Observable
则重新发送事件。
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onError(new Exception("error happen."));emitter.onNext(4);}})// 上游遇到error时回调.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {// 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型// 返回Observable<?> = 新的被观察者 Observable(任意类型)// throwableObservable 必须被处理,不然只会发送上游发送error事件return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {/*** 1. emit error. 不会重新发射数据。异常传递到观察的的onError中* 10:54:26.148 com...mple.test_android D 接收到了事件1* 10:54:26.148 com...mple.test_android D 接收到了事件2* 10:54:26.148 com...mple.test_android D 接收到了事件3* 10:54:26.148 com...mple.test_android D 对Error事件作出响应java.lang.Throwable: retry stop!*/// return Observable.error(new Throwable("retry stop!"));/*** 2. emit onNext* 原始的Observable则重新发送数据* 10:57:22.759 com...mple.test_android D 接收到了事件1* 10:57:22.759 com...mple.test_android D 接收到了事件2* 10:57:22.759 com...mple.test_android D 接收到了事件3* 10:57:22.759 com...mple.test_android D 接收到了事件1* 10:57:22.759 com...mple.test_android D 接收到了事件2* 10:57:22.759 com...mple.test_android D 接收到了事件3* 10:57:22.759 com...mple.test_android D 接收到了事件1* 10:57:22.759 com...mple.test_android D 接收到了事件2* 10:57:22.759 com...mple.test_android D 接收到了事件3*/return Observable.just(true);}});}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer value) {Log.d(TAG, "接收到了事件" + value);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "对Error事件作出响应" + e.toString());// 获取异常错误信息}@Overridepublic void onComplete() {Log.d(TAG, "对Complete事件作出响应");}});
RepeatWhen
有条件地、重复发送 被观察者事件。 将原始 Observable
停止发送事件的标识(Complete() / Error())
。转换成1个 Object 类型数据传递给1个新被观察者(Observable
),以此决定是否重新订阅 & 发送原来的 Observable
。
- 若新被观察者(
Observable
)返回1个Complete / Error
事件,则不重新订阅 & 发送原来的Observable
- 若新被观察者(
Observable
)返回其余事件时,则重新订阅 & 发送原来的Observable
Observable.just(1, 2, 3, 4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {@Override// 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Object o) throws Exception {/*** 1. 发送onComplete事件,不会重新发送原来的数据 但不会回调观察者的onComplete()* 11:03:43.908 com...mple.test_android D 开始采用subscribe连接* 11:03:43.908 com...mple.test_android D 接收到了事件1* 11:03:43.908 com...mple.test_android D 接收到了事件2* 11:03:43.908 com...mple.test_android D 接收到了事件3* 11:03:43.908 com...mple.test_android D 接收到了事件4 *///return Observable.empty();/*** 2. 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。* * 11:05:38.118 com...mple.test_android D 开始采用subscribe连接* * 11:05:38.119 com...mple.test_android D 接收到了事件1* * 11:05:38.119 com...mple.test_android D 接收到了事件2* * 11:05:38.119 com...mple.test_android D 接收到了事件3* * 11:05:38.119 com...mple.test_android D 接收到了事件4* * 11:05:38.121 com...mple.test_android D 对Error事件作出响应:java.lang.Throwable: repeat when stop!*///return Observable.error(new Throwable("repeat when stop!"));/*** 3.若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable* 11:07:23.876 com...mple.test_android D 开始采用subscribe连接* 11:07:23.877 com...mple.test_android D 接收到了事件1* 11:07:23.877 com...mple.test_android D 接收到了事件2* 11:07:23.877 com...mple.test_android D 接收到了事件3* 11:07:23.877 com...mple.test_android D 接收到了事件4* 11:07:23.877 com...mple.test_android D 接收到了事件1* 11:07:23.877 com...mple.test_android D 接收到了事件2* 11:07:23.877 com...mple.test_android D 接收到了事件3* 11:07:23.877 com...mple.test_android D 接收到了事件4* 11:07:23.877 com...mple.test_android D 接收到了事件1* 11:07:23.877 com...mple.test_android D 接收到了事件2* 11:07:23.877 com...mple.test_android D 接收到了事件3* 11:07:23.877 com...mple.test_android D 接收到了事件4*/return Observable.just(1);}});}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "开始采用subscribe连接");}@Overridepublic void onNext(Integer value) {Log.d(TAG, "接收到了事件" + value);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "对Error事件作出响应:" + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "对Complete事件作出响应");}});
RetryWhen 和 RepeatWhen组合完成轮询请求
private int i = 0;
public void repeatAndRetryWhen() {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);//emitter.onError(new Throwable("error happened!!")); // error走retryWhenemitter.onNext(3);emitter.onComplete(); // 顺利完成走repeatWhen}}).repeat().retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {// 超出最大请求次数或者这个throwable是结束条件,发送onError传递到下游if (i > 4) {return Observable.error(new Throwable("stop retry!"));}// 延迟5s后进行重试return Observable.just(1).delay(5, TimeUnit.SECONDS);}});}}).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {// 正常结束后10s开始轮询return objectObservable.delay(10, TimeUnit.SECONDS);}}).doFinally(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "Finally!!");}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "开始采用subscribe连接");}@Overridepublic void onNext(Integer value) {i++;Log.d(TAG, "接收到了事件" + value);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "对Error事件作出响应:" + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "对Complete事件作出响应");}});}12:53:10.226 com...mple.test_android D 开始采用subscribe连接
12:53:10.238 com...mple.test_android D 接收到了事件1
12:53:15.242 com...mple.test_android D 接收到了事件1
12:53:20.245 com...mple.test_android D 接收到了事件1
12:53:25.248 com...mple.test_android D 接收到了事件1
12:53:30.253 com...mple.test_android D 接收到了事件1
12:53:30.281 com...mple.test_android D 对Error事件作出响应:java.lang.Throwable: stop retry!
12:53:30.281 com...mple.test_android D Finally!!