对我而言,使用Rx-java的最大好处之一就是无论底层调用是同步还是异步,因此代码看起来都完全相同,因此该条目的标题也是如此。
考虑一个非常简单的客户端代码用例,它执行三个缓慢运行的调用并将结果合并到一个列表中:
String op1 = service1.operation();
String op2 = service2.operation();
String op3 = service3.operation();
Arrays.asList(op1, op2, op3)
由于呼叫是同步的,因此花费的时间会增加。 为了模拟慢速调用,以下是每个方法调用中的实现类型:
public String operation() {logger.info("Start: Executing slow task in Service 1");Util.delay(7000);logger.info("End: Executing slow task in Service 1");return "operation1"
}
因此,在这些实现中使用rx-java的第一个尝试是简单地让这些长时间运行的操作返回通用类型Observable ,一个糟糕的实现看起来像这样:
public Observable<string> operation() {logger.info("Start: Executing slow task in Service 1");Util.delay(7000);logger.info("End: Executing slow task in Service 1");return Observable.just("operation 1");
}
因此,调用方实现将更改为以下内容:
Observable<String> op1 = service1.operation();
Observable<String> op2 = service2.operation();
Observable<String> op3 = service3.operation();Observable<List<String>> lst = Observable.merge(op1, op2, op3).toList();
查看调用者如何使用merge方法组合结果。
但是,此时每个服务调用的调用仍是同步的,为了使调用中断,可以通过以下方式使服务调用使用线程池:
public class Service1 {private static final Logger logger = LoggerFactory.getLogger(Service1.class);public Observable<String> operation() {return Observable.<String>create(s -> {logger.info("Start: Executing slow task in Service 1");Util.delay(7000);s.onNext("operation 1");logger.info("End: Executing slow task in Service 1");s.onCompleted();}).subscribeOn(Schedulers.computation());}
}
subscriptionOn使用指定的Scheduler运行实际操作。
该方法的优点在于,该服务的调用代码根本没有更改,那里的实现与以前完全相同,而服务调用现在是异步的。 如果您有兴趣进一步探索这个样本, 这里是一个GitHub库一起工作的例子。
翻译自: https://www.javacodegeeks.com/2015/02/async-abstractions-using-rx-java.html