截止日前最新版2017-3-15: RxJava
compile ‘io.reactivex:rxjava:1.2.7’ compile ‘io.reactivex:rxandroid:1.2.1’
RxJava2
compile “io.reactivex.rxjava2:rxjava:2.0.7” compile “io.reactivex.rxjava2:rxandroid:2.0.1”
1:create操作改变 Rxjava
CompositeSubscription compositeSubscription = new CompositeSubscription(); Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("1");subscriber.onNext("2");//e.onComplete();subscriber.onError(new NullPointerException());} }); Subscription subscribe = observable.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {} });//取消订阅 subscribe.unsubscribe(); //或者 compositeSubscription.add(subscribe); compositeSubscription.unsubscribe();
RxJava2
final CompositeDisposable compositeDisposable = new CompositeDisposable(); Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("1");e.onNext("2");//e.onComplete();e.onError(new NullPointerException());} });//subscribe方法返回void类型 observable.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {compositeDisposable.add(d);}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });compositeDisposable.dispose();
2:新增Flowable
Flowable<String> stringFlowable = Flowable.create(new FlowableOnSubscribe<String>() {@Overridepublic void subscribe(FlowableEmitter<String> e) throws Exception {e.onNext("1");e.onNext("2");e.onComplete();//e.onError(new NullPointerException()); }}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread());stringFlowable.subscribe(new Subscriber<String>() {@Overridepublic void onSubscribe(Subscription s) {//s.request(1);// 参数表示接受多少次onNext回调.// 当onNext回调次数和这个参数不一致时, 则通过BackpressureStrategy.ERROR参数决定之后的处理// 不调用等价于request(0).// 典型的错误MissingBackpressureException异常// 注意: Flowable请尽量在异步线程使用,否则很容易出现MissingBackpressureException异常 }@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable t) {}@Overridepublic void onComplete() {} });