需求:给商品打折
1、根据产品名称返回价格
2、查询商品的折扣率
3、计算新价格
源码清单
@Data
public class DiscountShop {private final String name;private final Random random;public DiscountShop(String name) {this.name = name;random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));}// 根据产品名称返回价格折扣信息public String getPrice(String product) {double price = calculatePrice(product);Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];return name + ":" + price + ":" + code;}public double calculatePrice(String product) {Util.delay();// 依据产品的名称,生成一个随机值作为价格return Util.format(random.nextDouble() * product.charAt(0) + product.charAt(1));}
}// 报价服务
@Data
public class Quote {private final String shopName;private final double price;private final Discount.Code discountCode;public Quote(String shopName, double price, Discount.Code discountCode) {this.shopName = shopName;this.price = price;this.discountCode = discountCode;}// 解析报价public static Quote parse(String s) {String[] split = s.split(":");String shopName = split[0];double price = Double.parseDouble(split[1]);Discount.Code discountCode = Discount.Code.valueOf(split[2]);return new Quote(shopName, price, discountCode);}
}// 折扣服务
ublic class Discount {public enum Code {NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);private final int percentage;Code(int percentage) {this.percentage = percentage;}}// 根据折扣计算新价格public static String applyDiscount(Quote quote) {return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDisco}// 价格打折private static double apply(double price, Code code) {Util.delay();return Util.format(price * (100 - code.percentage) / 100);}
}
实现方案
方案1:串行执行
// 串行支持
public List<String> findPricesSequential(String product) {return discountShops.stream().map(discountShop -> discountShop.getPrice(product)).map(Quote::parse).map(Discount::applyDiscount).collect(Collectors.toList());
}
方案2:并行流的方式
// 并行流的方式
// Stream底层依赖的是线程数量固定的通用线程池
public List<String> findPricesParallel(String product) {return discountShops.parallelStream().map(discountShop -> discountShop.getPrice(product)).map(Quote::parse).map(Discount::applyDiscount).collect(Collectors.toList());
}
方案3:构造同步和异步操作+返回值
private final Executor executor = Executors.newFixedThreadPool(discountShops.size(), ExecuterThreadFactoryBuilder.build());// 构造同步和异步操作
ublic List<String> findPricesFuture(String product) {List<CompletableFuture<String>> priceFutures =discountShops.stream().map(discountShop -> CompletableFuture.supplyAsync(() -> discountShop.getPrice(product), executor)).map(future -> future.thenApply(Quote::parse))// 一般情况下解析操作不涉及任何远程服务,也不会进行任何I/O操作,它几乎可以在第一时间进行,所以能够采用同步操作,不会带来太多的延迟。.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))).collect(Collectors.<CompletableFuture<String>>toList());return priceFutures.stream().map(CompletableFuture::join)// 等待流中的所有Future执行完毕,并提取各自的返回值.collect(Collectors.toList());
thenCompose
方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。
通常而言,名称中不带Async
的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async
结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。
方案3:构造同步和异步操作+消费型
private final Executor executor = Executors.newFixedThreadPool(discountShops.size(), ExecuterThreadFactoryBuilder.build());// 构造同步和异步操作+消费型
public List<String> printPricesStream(String product) {long start = System.nanoTime();CompletableFuture<String>[] priceFutures = discountShops.stream().map(discountShop -> CompletableFuture.supplyAsync(() -> discountShop.getPrice(product), executor)).map(future -> future.thenApply(Quote::parse)).map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))).peek(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)"))).toArray(size -> new CompletableFuture[size]);System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");CompletableFuture.allOf(priceFutures).join();return null; // 仅为了统一使用stopwatch加的返回值
}
性能比较
public static void main(String[] args) {StopWatch stopWatch = new StopWatch("性能比较");execute("sequential", () -> bestPriceFinder.findPricesSequential("myPhone27S"), stopWatch);execute("parallel", () -> bestPriceFinder.findPricesParallel("myPhone27S"), stopWatch);execute("composed CompletableFuture", () -> bestPriceFinder.findPricesFuture("myPhone27S"), stopWatch);execute("composed printPricesStream", () -> bestPriceFinder.printPricesStream("myPhone27S"), stopWatch);StopWatchUtils.logStopWatch(stopWatch);
}private static void execute(String msg, Supplier<List<String>> s, StopWatch stopWatch) {stopWatch.start(msg);System.out.println(s.get());stopWatch.stop();System.out.println();
}执行结果
[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28][BestPrice price is 117.57, LetsSaveBig price is 174.03, MyFavoriteShop price is 173.77, BuyItAll price is 169.89, ShopEasy price is 176.43][BestPrice price is 204.78, LetsSaveBig price is 190.85, MyFavoriteShop price is 128.92, BuyItAll price is 140.31, ShopEasy price is 166.1]All shops have now responded in 7 msecs
ShopEasy price is 224.23 (done in 2034 msecs)
BuyItAll price is 111.53 (done in 2034 msecs)
MyFavoriteShop price is 119.11 (done in 2034 msecs)
BestPrice price is 127.88 (done in 2034 msecs)
LetsSaveBig price is 147.21 (done in 2034 msecs)
null性能比较 total cost time = 16226 ms
sequential : 10118 ms, 62.36%
parallel : 2035 ms, 12.54%
composed CompletableFuture : 2031 ms, 12.52%
composed printPricesStream : 2040 ms, 12.57%
参考
《Java8 实战》第11章 CompletableFuture:组合式异步编程