响应式编程
- 1.多维度看全景
- 1.1响应式编程(Reactive Programming )
- 1.2函数式编程(Functional Programming, 简称FP)
- 1.3技术演进
- 1.4Rx是什么
- 1.5[响应式宣言](https://www.reactivemanifesto.org/zh-CN)
- 2.钻进去看本质
- 2.1名称解释(rajava)
- 2.2观察者模式
- 2.3HelloWorld
- 2.4 Observable操作符
- 2.5 EventBus vs RxJava
- 3.参考资料
1.多维度看全景
1.1响应式编程(Reactive Programming )
为了直观地了解什么是响应式,我们先从一个大家都比较熟悉的类比开始。首先打开Excel,在B、C、D三列输入如下公式:
B、C和D三列每个单元格的值均依赖其左侧的单元格,当我们在A列依次输入1、2和3时,变化会自动传递到了B、C和D三列,并触发相应状态变更,如下图:
我们可以把A列从上到下想象成一个数据流,每一个数据到达时都会触发一个事件,该事件会被传播到右侧单元格,后者则会处理事件并改变自身的状态。这一系列流程其实就是反应式的核心思想。
通过这个例子,你应该能感受到响应式的核心是数据流(data stream)。下面我们再来看一个例子。我们很多人每天都会坐地铁上下班,地铁每两分钟一班,并且同一条轨道会被很多地铁共享,你会不会因为担心追尾而不敢坐首尾两节车厢呢? 其实如果采用反应式架构构建地铁系统,就无需担心追尾问题。在反应式系统中,每辆地铁都会实时将自己的速度和位置等状态信息通知给上下游的其他地铁,同时也会实时的接收其他地铁的状态信息,并实时做出反馈。例如当发现下游地铁突然意外减速,则立即调整自身速度,并将减速事件通知到上游地铁,如此,整条轨道上的所有地铁形成一种背压机制(back pressure),根据上下游状态自动调整自身速度。
下面我们来看下维基百科关于反应式编程的定义:
响应式编程 (reactive programming) 是一种基于数据流 (data stream) 和 变化传递 (propagation of change) 的声明式 (declarative) 的编程范式。
从上面的定义中,我们可以看出反应式编程的核心是数据流以及变化传递。这一定义没有区分数据流的同步和异步模式, 更准确地说,异步数据流(asynchronous data stream)才是反应式编程的最佳实践。因为传统的编程方式是顺序(同步)执行的。当任务之间存在依赖关系时,上一个任务完成之后才会执行下一个任务。如果需要提高程序的响应速度,需要把同步执行的程序改为异步执行,这样方法的执行就变为了消息发送,能够挖掘多核CPU的能力、降低延迟和阻塞。细心的读者会发现,这不就是观察者模式(Observer Pattern)嘛! 其实反应式并不是指具体的技术,而是指一些架构设计原则, 观察者模式是实现反应式的一种手段。
经过上述的学习,响应式编程的特点可以归纳如下:
-
数据流:基于数据流模型,响应式编程提供了一套统一的Stream风格的数据处理接口。与java 8中的Stream相比,响应式编程除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。
-
变化传播:就是以一个数据流作为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。
-
异步编程:传统的编程方式是顺序(同步)执行的。当任务之间存在依赖关系时,上一个任务完成之后才会执行下一个任务。如果需要提高程序的响应速度,需要把同步执行的程序改为异步执行,这样方法的执行就变为了消息发送。响应式编程提供了合适的异步编程模型能够挖掘多核CPU的能力、降低延迟和阻塞。
1.2函数式编程(Functional Programming, 简称FP)
java语言中,数据类型分为基本类型和引用类型,这些类型可以用于赋值和传递,通常被称为一等公民(这个术语是从20世纪60年代美国民权运动中借用来的)。编程语言中的其他结构也许有助于我们表示值的结构,但在程序执行期间不能传递,因而是二等公民。在Java8之前,方法和类就是二等公民。
人们发现,在运行时传递方法能将方法变成一等公民,这在编程中非常有用,因此Java 8的设计者把这个功能加入到了Java中。Java 8允许“把代码传递给方法”,这种方式简洁地表达了行为参数化。虽然Java 8之前可以用匿名类实现行为参数化,但Java 8代码更加简洁、可读性更好、还让我们能够使用一整套新技巧(流操作)。这种函数可以被来回传递并加以组合,从而产生强大的编程语汇,我们将这种编程范式称为函数式编程。
下面仍以java8中的函数为例,总结函数式编程的特点如下:
-
java8中函数是“一等公民”(First Class)。函数与其他数据类型一样,处于平等地位,可以赋值给其他变量、作为入参或返回值。
-
java8支持闭包和高阶函数。(闭包是起函数的作用并可以像对象一样操作的对象。高阶函数可以使用另一个函数作为输入,同时支持返回一个函数作为输出。这两种结构结合在一起可以以优雅的方式进行模块化编程。)
-
递归。在函数式编程中,变量都是不可变的(immutable),所以函数式编程无法实现循环操作。循环操作需要依赖一个可变的状态做为跳出循环的条件。因此函数式编程把递归作为控制流程的机制。
-
惰性求值(Lazy Evaluation)。支持惰性求值的编译器会像数学家看待代数表达式那样看待函数式编程的程序,即抵消相同项从而避免执行无谓的代码。
-
没有“副作用”(Side Effect)。函数要保持独立,所有功能就是返回一个新的值,没有其他行为,不会修改外部变量的值。没有并发编程中的线程安全问题。
1.3技术演进
Future模式: JDK5中的Future模式是多线程的一种设计模式。我们可以提交任务给一个Future,Future替我们完成任务,期间我们可以去做其他想做的事情,一段时间后从Future中获取结果。虽然Future模式支持获取异步执行的任务结果,但它并没有提供通知的机制,所以我们无法得知Future什么时候完成任务。要么使用isDone()不停轮询判断Future是否完成,这样会耗费CPU资源;要么执行future.get()等待Future返回的结果,这会使程序变为同步操作。
CompletableFuture弥补了Future模式的缺点,在异步任务完成后,可以将其结果通过 thenAccept、thenApply、thenCompose等操作交给另一个异步事件处理线程来处理。(即事件通知,回调)
总结:响应式编程 (reactive programming) 是一种基于数据流 (data stream) 和 变化传递 ,以“非阻塞”和“异步”为特性,采用函数式的语法,实现并发执行效率。统一了java并发编程模型,使同步与异步的实现代码无明显差异。
1.4Rx是什么
ReactiveX是Reactive Extensions的缩写,一般简写为Rx,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源的编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了。Rx的大部分语言库由ReactiveX这个组织负责维护,社区网站是 reactivex.io。
微软给的定义是,Rx是一个函数库。使用Rx开发者可以用Observables表示异步数据流,用LINQ操作符查询、组合、变换异步数据流, 用Schedulers参数化异步数据流的并发处理。Rx可以这样定义:Rx = Observables + LINQ + Schedulers。ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
向对象编程和面向切面编程(Aspect Orient Programming)在java领域是最常见的两种编程思想。Rx又不仅仅是一个编程函数库,它是一种编程思想的突破。它影响了许多其它的程序库和框架以及编程语言,很多公司都在使用Rx。
参考链接
1.5响应式宣言
2013年6月,Roland Kuhn等人发布了《响应式宣言》, 该宣言定义了反应式系统应该具备的一些架构设计原则。符合反应式设计原则的系统称为反应式系统。根据反应式宣言, 反应式系统需要具备即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)和消息驱动(Message Driven)四个特质,以下内容摘自反应式宣言官网:
-
即时响应性(Responsive)。系统应该对用户的请求即时做出响应。即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。
-
回弹性(Resilient)。 系统在出现失败时依然能保持即时响应性, 每个组件的恢复都被委托给了另一个外部的组件, 此外,在必要时可以通过复制来保证高可用性。 因此组件的客户端不再承担组件失败的处理。
-
弹性(Elastic)。 系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入负载的速率变化做出反应,比如通过横向地伸缩底层计算资源。 这意味着设计上不能有中央瓶颈, 使得各个组件可以进行分片或者复制, 并在它们之间进行负载均衡。
-
消息驱动(Message Driven)。反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。
2.钻进去看本质
2.1名称解释(rajava)
2.2观察者模式
设计模式分为三类:创建型模式是将创建和使用代码解耦;结构型模式是将不同功能的代码解耦;行为型模式是将不同的行为代码解耦。观察者模式是行为型模式。
在Rx中,很多指令可能是并行执行的,之后他们的执行结果才会被“观察者”捕获。为达到这个目的,在RxJava中,一个实现了Observer接口的对象可以订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待Observable发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。
Subscribe方法用于将观察者连接到Observable,观察者需要实现以下方法的一个子集:
- onNext(T item):
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。 - onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。 - onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。
在实际的项目中,被观察者(Observable)和观察者(Observer)这两个对象的称呼比较灵活,叫法多种多样,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener。这个模型通常被称作Reactor模式。Reactor 反应堆设计模式
根据应用场景的不同,观察者模式会对应不同的代码实现方式:有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式。
- 同步阻塞方式
// 观察者
public interface RegObserver {void handleRegSuccess(long userId);
}
// 促销观察者
public class RegPromotionObserver implements RegObserver {private PromotionService promotionService; // 依赖注入@Overridepublic void handleRegSuccess(long userId) {promotionService.issueNewUserExperienceCash(userId);}
}
// 提醒观察者
public class RegNotificationObserver implements RegObserver {private NotificationService notificationService;@Overridepublic void handleRegSuccess(long userId) {notificationService.sendInboxMessage(userId, "Welcome...");}
}// 第一种实现
public class UserController {private UserService userService; // 依赖注入private List<RegObserver> regObservers = new ArrayList<>();// 把观察者注册,一次性设置好(支持动态的修改)public void setRegObservers(List<RegObserver> observers) {regObservers.addAll(observers);}public Long register(String telephone, String password) {//...省略非核心代码long userId = userService.register(telephone, password);// 遍历观察者,消息回调(执行过程会阻塞当前线程)for (RegObserver observer : regObservers) {observer.handleRegSuccess(userId);}return userId;}
}
- 异步非阻塞方式
// 第二种实现方式,其他类代码不变
public class UserController {private UserService userService; // 依赖注入private List<RegObserver> regObservers = new ArrayList<>();private Executor executor;public UserController(Executor executor) {this.executor = executor;}public void setRegObservers(List<RegObserver> observers) {regObservers.addAll(observers);}public Long register(String telephone, String password) {//...省略非核心代码long userId = userService.register(telephone, password);// 异步执行观察者逻辑for (RegObserver observer : regObservers) {executor.execute(new Runnable() {@Overridepublic void run() {observer.handleRegSuccess(userId);}});}return userId;}
}
上述两种实现方式的缺点是,所有逻辑都耦合在register函数中,该观察者模式无法被其他业务复用。而RxJava中提供的观察者模式隐藏实现细节,解耦业务与非业务代码,让程序员聚焦业务开发,可复用性更好。
Observable分类:
根据Observable开始发射数据序列的时机分类。
略。。。
观察者模式 vs 消息队列
响应式编程模式可以实现功能的解耦,但如果不同的系统之间需要解耦,就需要借助消息队列(Message Queue)来实现。弊端是需要引入一个新的系统(消息队列),增加了维护成本。优势是被观察者和观察者解耦更加彻底,两部分的耦合更小,被观察者完全不感知观察者,同理,观察者也完全不感知被观察者。被观察者只管发送消息到消息队列,观察者只管从消息队列中读取消息来执行相应的逻辑。
而在观察者模式中中,观察者需要注册到被观察者中,被观察者需要依次遍历观察者来发送消息。