或是冒险从Cassandra被动地读取数据。
总览
让我们首先尝试从编程的角度定义什么是反应性。
功能反应式编程是使用功能性编程的构建块进行反应式编程的编程范例。
函数式编程是一种编程范例,是一种构建计算机程序的结构和元素的样式,这种处理将计算视为对避免状态和可变数据的数学函数的评估。 函数式编程强调产生仅取决于其输入而不取决于程序状态的结果的函数。
我们如何用Java进行函数式编程? Java是面向对象的编程语言,其中到处都有可变状态。
世界各地的所有Java开发人员都使用以下任何接口:
java.lang.Runnable,java.util.Comparator,java.util.concurrent.Callable或java.awt.event.ActionListener。 所有这些接口都只声明了一个方法。 这些接口被称为单一抽象方法或SAM。 使用这些流行的方法是创建匿名内部类。
public class RunnableTest {public static void main(Sting[] args){new Thread(new Runnable(){@Overridepublic void run(){System.out.println("A new thread is running ...");}}).start();}
}
由于语言规范中不包含函数,因此Java中的函数式编程非常困难。 通过引入“ lambda”,它将在Java 8中变得更加简单。 但是我们如何用Java进行函数式编程呢?
让我们看一个简单的例子。
@FunctionalInterface
public interface Worker {public void doWork();
}
public class FunctionalWorker {public static void main(String[] args){// anonymous inner class wayexecute( new Worker(){@Overridepublic void doWork() {System.out.println ("working ...");}});// lambda's wayexecute(() -> System.out.println("working in lambda's way ..."));}public static void execute(Worker worker){worker.doWork();}
}
响应式编程是一种围绕数据流和更改传播的编程范例。 例如,在命令式编程设置中,a:= b + c表示在表达式求值的瞬间,即为a分配了b + c的结果。 以后可以更改b或c的值,而不会影响a。 在反应式编程中,a的值将基于新值自动更新。
因此,我们应该对什么是功能响应式编程有一个很好的了解,让我们开始构建一个原型……
反应性地从Cassandra读取数据
卡桑德拉(Cassandra)是其中的NoSql存储之一,非常受欢迎。
假设我们必须构建一个头像服务。 该服务将把化身的元信息和内容直接存储在cassandra中。
我们正在使用的Java驱动程序通过executeAsync()方法为我们提供了查询cassandra异步的支持 。 调用此方法将返回一个Future。 众所周知,java Futures是可阻止的并且无法组成。
好的,所以我们有异步支持,但是我们仍然需要一种能够以被动方式读取它的方法……
Netflix建立了RxJava库,并在以后开源了该库,该库提供了Java(以及其他JVM语言)的功能性响应编程。
通过提供可过滤,选择,转换,组合和组成Observable的运算符的集合,Functional React提供了有效的执行和组合。
可以将Observable数据类型视为等效于Iterable的“推”,即“拉”。 使用Iterable,使用者从生产者和线程块中提取值,直到这些值到达为止。 与Observable类型相反,只要值可用,生产者就会将值推送给消费者。 这种方法更加灵活,因为值可以同步或异步到达。
Observable类型将四个缺少的语义添加到Gang of Four的Observer模式中,在Iterable类型中可用:
- 生产者向消费者发出没有更多可用数据的信号的能力。
- 生产者向消费者发出发生错误的信号的能力。
通过这两个简单的加法,我们统一了Iterable和Observable类型。 它们之间的唯一区别是数据流动的方向。 这一点非常重要,因为现在我们对Iterable执行的任何操作也可以对Observable执行。
让我们看看如何结合RxJava和Cassandra异步查询执行来构建Observable。
package net.devsprint.reactive.cassandra;import java.util.concurrent.ExecutorService;import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;/*** Wraps an async execution of Datastax Java driver into an observable.* */
public class ObservableCassandra {public static Observable executeAsync(final Session execution,final String query, final ExecutorService executorService) {return Observable.create(new Observable.OnSubscribeFunc() {@Overridepublic Subscription onSubscribe(final Observer observer) {try {Futures.addCallback(execution.executeAsync(query),new FutureCallback() {@Overridepublic void onSuccess(ResultSet result) {observer.onNext(result);observer.onCompleted();}@Overridepublic void onFailure(Throwable t) {observer.onError(t);}}, executorService);} catch (Throwable e) {// If any Throwable can be thrown from// executeAsyncobserver.onError(e);}return Subscriptions.empty();}});}}
executeAsync()方法返回一个Guava可监听的Future 。 在此将来添加回调可以使我们正确地实现Observer接口。
一个简单的服务可以实现如下:
package net.devsprint.reactive.cassandra;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import rx.Observable;import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;public class AvatarService {private static final String QUERY = "select * avatars";private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private final Session session;public AvatarService(Session session) {this.session = session;}Observable getAvatars() {return ObservableCassandra.executeAsync(session, QUERY, executorService);}}
假设查询很繁重,我们将提供一个单独的执行上下文,在该上下文中将执行回调。
通过这两个类,我们可以启动Avatar服务,但它不会做任何事情。 仅当至少有一个订户时,它才会开始从Cassandra获取数据。 一个完整的例子可以在Reactive Cassandra中找到。
翻译自: https://www.javacodegeeks.com/2014/01/reactive-cassandra.html