和朱晔一起复习Java并发(五):并发容器和同步器

和朱晔一起复习Java并发(五):并发容器和同步器

本节我们先会来复习一下java.util.concurrent下面的一些并发容器,然后再会来简单看一下各种同步器。

ConcurrentHashMap和ConcurrentSkipListMap的性能

首先,我们来测试一下ConcurrentHashMap和ConcurrentSkipListMap的性能。
前者对应的非并发版本是HashMap,后者是跳表实现,Map按照Key顺序排序(当然也可以提供一个Comparator进行排序)。

在这个例子里,我们不是简单的测试Map读写Key的性能,而是实现一个多线程环境下使用Map最最常见的场景:统计Key出现频次,我们的Key的范围是1万个,然后循环1亿次(也就是Value平均也在1万左右),10个并发来操作Map:

@Slf4j
public class ConcurrentMapTest {int loopCount = 100000000;int threadCount = 10;int itemCount = 10000;@Testpublic void test() throws InterruptedException {StopWatch stopWatch = new StopWatch();stopWatch.start("hashmap");normal();stopWatch.stop();stopWatch.start("concurrentHashMap");concurrent();stopWatch.stop();stopWatch.start("concurrentSkipListMap");concurrentSkipListMap();stopWatch.stop();log.info(stopWatch.prettyPrint());}private void normal() throws InterruptedException {HashMap<String, Long> freqs = new HashMap<>();ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);synchronized (freqs) {if (freqs.containsKey(key)) {freqs.put(key, freqs.get(key) + 1);} else {freqs.put(key, 1L);}}}));forkJoinPool.shutdown();forkJoinPool.awaitTermination(1, TimeUnit.HOURS);//log.debug("normal:{}", freqs);}private void concurrent() throws InterruptedException {ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(itemCount);ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);freqs.computeIfAbsent(key, k -> new LongAdder()).increment();}));forkJoinPool.shutdown();forkJoinPool.awaitTermination(1, TimeUnit.HOURS);//log.debug("concurrentHashMap:{}", freqs);}private void concurrentSkipListMap() throws InterruptedException {ConcurrentSkipListMap<String, LongAdder> freqs = new ConcurrentSkipListMap<>();ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);freqs.computeIfAbsent(key, k -> new LongAdder()).increment();}));forkJoinPool.shutdown();forkJoinPool.awaitTermination(1, TimeUnit.HOURS);//log.debug("concurrentSkipListMap:{}", freqs);}
}

这里可以看到,这里的三种实现:

  • 对于normal的实现,我们全程锁住了HashMap然后进行读写
  • 对于ConcurrentHashMap,我们巧妙利用了一个computeIfAbsent()方法,实现了判断Key是否存在,计算获取Value,put Key Value三步操作,得到一个Value是LongAdder(),然后因为LongAdder是线程安全的所以直接调用了increase()方法,一行代码实现了5行代码效果
  • ConcurrentSkipListMap也是一样

运行结果如下:
image_1dg7ckmsp1k4cde81agdpc1fit9.png-64.2kB

可以看到我们利用ConcurrentHashMap巧妙实现的并发词频统计功能,其性能相比有锁的版本高了太多。
值得注意的是,ConcurrentSkipListMap的containsKey、get、put、remove等类似操作时间复杂度是log(n),加上其有序性,所以性能和ConcurrentHashMap有差距。

如果我们打印一下ConcurrentSkipListMap最后的结果,差不多是这样的:
image_1dg7dcv63506qsiahte1s17b7m.png-353.9kB
可以看到Entry按照了Key进行排序。

ConcurrentHashMap的那些原子操作方法

这一节我们比较一下computeIfAbsent()和putIfAbsent()的区别,这2个方法很容易因为误用导致一些Bug。

  • 第一个是性能上的区别,如果Key存在的话,computeIfAbsent因为传入的是一个函数,函数压根就不会执行,而putIfAbsent需要直接传值。所以如果要获得Value代价很大的话,computeIfAbsent性能会好
  • 第二个是使用上的区别,computeIfAbsent返回是的是操作后的值,如果之前值不存在的话就返回计算后的值,如果本来就存在那么就返回本来存在的值,putIfAbsent返回的是之前的值,如果原来值不存在那么会得到null

写一个程序来验证一下:

@Slf4j
public class PutIfAbsentTest {@Testpublic void test() {ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();log.info("Start");log.info("putIfAbsent:{}", concurrentHashMap.putIfAbsent("test1", getValue()));log.info("computeIfAbsent:{}", concurrentHashMap.computeIfAbsent("test1", k -> getValue()));log.info("putIfAbsent again:{}", concurrentHashMap.putIfAbsent("test2", getValue()));log.info("computeIfAbsent again:{}", concurrentHashMap.computeIfAbsent("test2", k -> getValue()));}private String getValue() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return UUID.randomUUID().toString();}
}

在这里获取值的操作需要1s,从运行结果可以看到,第二次值已经存在的时候,putIfAbsent还耗时1s,而computeIfAbsent不是,而且还可以看到第一次值不存在的时候putIfAbsent返回了null,而computeIfAbsent返回了计算后的值:

image_1dg7e2vdb1iin1vp6113c1d4q1mkp13.png-203.7kB

使用的时候一定需要根据自己的需求来使用合适的方法。

ThreadLocalRandom的误用

之前的例子里我们用到了ThreadLocalRandom,这里简单提一下ThreadLocalRandom可能的误用:

@Slf4j
public class ThreadLocalRandomMisuse {@Testpublic void test() throws InterruptedException {ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();IntStream.rangeClosed(1, 5).mapToObj(i -> new Thread(() -> log.info("wrong:{}", threadLocalRandom.nextInt()))).forEach(Thread::start);IntStream.rangeClosed(1, 5).mapToObj(i -> new Thread(() -> log.info("ok:{}", ThreadLocalRandom.current().nextInt()))).forEach(Thread::start);TimeUnit.SECONDS.sleep(1);}
}

一句话而言,我们应该每次都ThreadLocalRandom.current().nextInt()这样用而不是实例化了ThreadLocalRandom.current()每次调用nextInt()。观察一下两次输出可以发现,wrong的那5次得到的随机数都是一样的:
image_1dg7eb7ltbg2156p17ija8b1k281g.png-338kB

ConcurrentHashMap的并发reduce功能测试

ConcurrentHashMap提供了比较高级的一些方法可以进行并发的归并操作,我们写一段程序比较一下使用遍历方式以及使用reduceEntriesToLong()统计ConcurrentHashMap中所有值的平均数的性能和写法上的差异:

@Slf4j
public class ConcurrentHashMapReduceTest {int loopCount = 100;int itemCount = 10000000;@Testpublic void test() {ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, itemCount).boxed().collect(Collectors.toMap(i -> "item" + i, Function.identity(),(o1, o2) -> o1, ConcurrentHashMap::new));StopWatch stopWatch = new StopWatch();stopWatch.start("normal");normal(concurrentHashMap);stopWatch.stop();stopWatch.start("concurrent with parallelismThreshold=1");concurrent(concurrentHashMap, 1);stopWatch.stop();stopWatch.start("concurrent with parallelismThreshold=max long");concurrent(concurrentHashMap, Long.MAX_VALUE);stopWatch.stop();log.info(stopWatch.prettyPrint());}private void normal(ConcurrentHashMap<String, Long> map) {IntStream.rangeClosed(1, loopCount).forEach(__ -> {long sum = 0L;for (Map.Entry<String, Long> item : map.entrySet()) {sum += item.getValue();}double average = sum / map.size();Assert.assertEquals(itemCount / 2, average, 0);});}private void concurrent(ConcurrentHashMap<String, Long> map, long parallelismThreshold) {IntStream.rangeClosed(1, loopCount).forEach(__ -> {double average = map.reduceEntriesToLong(parallelismThreshold, Map.Entry::getValue, 0, Long::sum) / map.size();Assert.assertEquals(itemCount / 2, average, 0);});}
}

执行结果如下:
image_1dg7etsj31t9c1cg5pg71sfr1vio1t.png-86.2kB
可以看到并行归并操作对于比较大的HashMap性能好不少,注意一点是传入的parallelismThreshold不是并行度(不是ForkJoinPool(int parallelism)的那个parallelism)的意思,而是并行元素的阈值,传入Long.MAX_VALUE取消并行,传入1充分利用ForkJoinPool。

当然,我们这里只演示了reduceEntriesToLong()一个方法,ConcurrentHashMap还有十几种各种reduceXXX()用于对Key、Value和Entry进行并行归并操作。

ConcurrentHashMap的误用

其实这里想说的之前的文章中也提到过,ConcurrentHashMap不能确保多个针对Map的操作是原子性的(除非是之前提到computeIfAbsent()和putIfAbsent()等等),比如在下面的例子里,我们有一个9990大小的ConcurrentHashMap,有多个线程在计算它离10000满员还有多少差距,然后填充差距:

@Test
public void test() throws InterruptedException {int limit = 10000;ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, limit - 10).boxed().collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),(o1, o2) -> o1, ConcurrentHashMap::new));log.info("init size:{}", concurrentHashMap.size());ExecutorService executorService = Executors.newFixedThreadPool(10);for (int __ = 0; __ < 10; __++) {executorService.execute(() -> {int gap = limit - concurrentHashMap.size();log.debug("gap:{}", gap);concurrentHashMap.putAll(LongStream.rangeClosed(1, gap).boxed().collect(Collectors.toMap(i -> UUID.randomUUID().toString(), Function.identity())));});}executorService.shutdown();executorService.awaitTermination(1, TimeUnit.HOURS);log.info("finish size:{}", concurrentHashMap.size());
}

这段代码显然是有问题的:

  • 第一,诸如size()、containsValue()等(聚合状态的)方法仅仅在没有并发更新的时候是准确的,否则只能作为统计、监控来使用,不能用于控制程序运行逻辑
  • 第二,即使size()是准确的,在计算出gap之后其它线程可能已经往里面添加数据了,虽然putAll()操作这一操作是线程安全的,但是这个这个计算gap,填补gap的逻辑并不是原子性的,不是说用了ConcurrentHashMap就不需要锁了

输出结果如下:
image_1dg7frtvg1qmgdso1cv9men15f12a.png-351.4kB

可以看到,有一些线程甚至计算出了负数的gap,最后结果是10040,比预期的limit多了40。

还有一点算不上误用,只是提一下,ConcurrentHashMap的Key/Value不能是null,而HashMap是可以的,为什么是这样呢?
下图是ConcurrentHashMap作者的回复:

image_1dg7ghtj114u219b81se319to1qg02n.png-282.5kB

意思就是如果get(key)返回了null,你搞不清楚这到底是key没有呢还是value就是null。非并发情况下你可以使用后contains(key)来判断,但是并发情况下不行,你判断的时候可能Map已经修改了。

CopyOnWriteArrayList测试

CopyOnWrite的意义在于几乎没有什么修改,而读并发超级高的场景,如果有修改,我们重起炉灶复制一份,虽然代价很大,但是这样能让99.9%的并发读实现无锁,我们来试试其性能,先是写的测试,我们比拼一下CopyOnWriteArrayList、手动锁的ArrayList以及synchronizedList包装过的ArrayList:

@Test
public void testWrite() {List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();List<Integer> arrayList = new ArrayList<>();List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());StopWatch stopWatch = new StopWatch();int loopCount = 100000;stopWatch.start("copyOnWriteArrayList");IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));stopWatch.stop();stopWatch.start("arrayList");IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {synchronized (arrayList) {arrayList.add(ThreadLocalRandom.current().nextInt(loopCount));}});stopWatch.stop();stopWatch.start("synchronizedList");IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));stopWatch.stop();log.info(stopWatch.prettyPrint());
}

10万次操作不算多,结果如下:
image_1dg7h4kskojhnuavv014o7104t34.png-73.1kB
可见CopyOnWriteArrayList的修改因为涉及到整个数据的复制,代价相当大。

再来看看读,先使用一个方法来进行1000万数据填充,然后测试,迭代1亿次:

private void addAll(List<Integer> list) {list.addAll(IntStream.rangeClosed(1, 10000000).boxed().collect(Collectors.toList()));
}@Test
public void testRead() {List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();List<Integer> arrayList = new ArrayList<>();List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());addAll(copyOnWriteArrayList);addAll(arrayList);addAll(synchronizedList);StopWatch stopWatch = new StopWatch();int loopCount = 100000000;int count = arrayList.size();stopWatch.start("copyOnWriteArrayList");IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));stopWatch.stop();stopWatch.start("arrayList");IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {synchronized (arrayList) {arrayList.get(ThreadLocalRandom.current().nextInt(count));}});stopWatch.stop();stopWatch.start("synchronizedList");IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));stopWatch.stop();log.info(stopWatch.prettyPrint());
}

执行结果如下:
image_1dg7h9gou67s1ck71rae1goeatr3h.png-83.1kB
的确没错,CopyOnWriteArrayList性能相当强悍,毕竟读取无锁,想多少并发就多少并发。

看完了大部分的并发容器我们再来看看五种并发同步器。

CountDownLatch测试

CountDownLatch在之前的文章中已经出现过N次了,也是五种并发同步器中使用最最频繁的一种,一般常见的应用场景有:

  • 等待N个线程执行完毕
  • 就像之前很多次性能测试例子,使用两个CountDownLatch,一个用来让所有线程等待主线程发起命令一起开启,一个用来给主线程等待所有子线程执行完毕
  • 异步操作的异步转同步,很多基于异步网络通讯(比如Netty)的RPC框架都使用了CountDownLatch来异步转同步,比如下面取自RocketMQ中Remoting模块的源码片段:

image_1dg7p5lfu1mh71rtuugl1er8nlk4b.png-281.7kB

来看看ResponseFuture的相关代码实现:

public class ResponseFuture {private final int opaque;private final Channel processChannel;private final long timeoutMillis;private final InvokeCallback invokeCallback;private final long beginTimestamp = System.currentTimeMillis();private final CountDownLatch countDownLatch = new CountDownLatch(1);private final SemaphoreReleaseOnlyOnce once;private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);private volatile RemotingCommand responseCommand;private volatile boolean sendRequestOK = true;private volatile Throwable cause;...  public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);return this.responseCommand;}public void putResponse(final RemotingCommand responseCommand) {this.responseCommand = responseCommand;this.countDownLatch.countDown();}
...
}

在发出网络请求后,我们等待响应,在收到响应后我们把数据放入后解锁CountDownLatch,然后等待响应的请求就可以继续拿数据。

Semaphore测试

Semaphore可以用来限制并发,假设我们有一个游戏需要限制同时在线的玩家,我们先来定义一个Player类,在这里我们通过传入的Semaphore限制进入玩家的数量。
在代码里,我们通过了之前学习到的AtomicInteger、AtomicLong和LongAdder来统计玩家的总数,最长等待时间和宗等待时长。

@Slf4j
public class Player implements Runnable {private static AtomicInteger totalPlayer = new AtomicInteger();private static AtomicLong longestWait = new AtomicLong();private static LongAdder totalWait = new LongAdder();private String playerName;private Semaphore semaphore;private LocalDateTime enterTime;public Player(String playerName, Semaphore semaphore) {this.playerName = playerName;this.semaphore = semaphore;}public static void result() {log.info("totalPlayer:{},longestWait:{}ms,averageWait:{}ms", totalPlayer.get(), longestWait.get(), totalWait.doubleValue() / totalPlayer.get());}@Overridepublic void run() {try {enterTime = LocalDateTime.now();semaphore.acquire();totalPlayer.incrementAndGet();TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();long ms = Duration.between(enterTime, LocalDateTime.now()).toMillis();longestWait.accumulateAndGet(ms, Math::max);totalWait.add(ms);//log.debug("Player:{} finished, took:{}ms", playerName, ms);}}
}

主测试代码如下:

@Test
public void test() throws InterruptedException {Semaphore semaphore = new Semaphore(10, false);ExecutorService threadPool = Executors.newFixedThreadPool(100);IntStream.rangeClosed(1, 10000).forEach(i -> threadPool.execute(new Player("Player" + i, semaphore)));threadPool.shutdown();threadPool.awaitTermination(1, TimeUnit.HOURS);Player.result();
}

我们限制并发玩家数量为10个,非公平进入,线程池是100个固定线程,总共有10000个玩家需要进行游戏,程序结束后输出如下:
image_1dg7pm9mmt3112srvku1gibprt4o.png-62kB
再来试试公平模式:
image_1dg7ps2jaepfifbmtg16fl117c5i.png-61.6kB
可以明显看到,开启公平模式后最长等待的那个玩家没有等那么久了,平均等待时间比之前略长,符合预期。

CyclicBarrier测试

CyclicBarrier用来让所有线程彼此等待,等待所有的线程或者说参与方一起到达了汇合点后一起进入下一次等待,不断循环。在所有线程到达了汇合点后可以由最后一个到达的线程做一下『后处理』操作,这个后处理操作可以在声明CyclicBarrier的时候传入,也可以通过判断await()的返回来实现。

这个例子我们实现一个简单的场景,一个演出需要等待3位演员到位才能开始表演,演出需要进行3次。我们通过CyclicBarrier来实现等到所有演员到位,到位后我们的演出需要2秒时间。

@Slf4j
public class CyclicBarrierTest {@Testpublic void test() throws InterruptedException {int playerCount = 5;int playCount = 3;CyclicBarrier cyclicBarrier = new CyclicBarrier(playerCount);List<Thread> threads = IntStream.rangeClosed(1, playerCount).mapToObj(player->new Thread(()-> IntStream.rangeClosed(1, playCount).forEach(play->{try {TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100));log.debug("Player {} arrived for play {}", player, play);if (cyclicBarrier.await() ==0) {log.info("Total players {} arrived, let's play {}", cyclicBarrier.getParties(),play);TimeUnit.SECONDS.sleep(2);log.info("Play {} finished",play);}} catch (Exception e) {e.printStackTrace();}}))).collect(Collectors.toList());threads.forEach(Thread::start);for (Thread thread : threads) {thread.join();}}
}

通过if (cyclicBarrier.await() ==0)可以实现在最后一个演员到位后做冲破栅栏后的后处理操作,我们看下这个演出是不是循环了3次,并且是不是所有演员到位后才开始的:

10:35:43.333 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 1
10:35:43.333 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 1
10:35:43.333 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 1
10:35:43.367 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 1
10:35:43.376 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 1
10:35:43.377 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 1
10:35:43.378 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 2
10:35:43.432 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 2
10:35:43.434 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 2
10:35:43.473 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 2
10:35:45.382 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 1 finished
10:35:45.390 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 2
10:35:45.390 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 2
10:35:45.437 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 3
10:35:45.443 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 3
10:35:45.445 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 3
10:35:45.467 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 3
10:35:47.395 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 2 finished
10:35:47.472 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 3
10:35:47.473 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 3
10:35:49.477 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 3 finished

从这个例子可以看到,我们的演出是在最后到达的Player1演员这个线程上进行的,值得注意的一点是,在他表演的时候其他演员已经又进入了等待状态(不要误认为,CyclicBarrier会让所有线程阻塞,等待后处理完成后再让其它线程继续下一次循环),就等他表演结束后继续来到await()才能又开始新的演出。

Phaser测试

Phaser和Barrier类似,只不过前者更灵活,参与方的人数是可以动态控制的,而不是一开始先确定的。Phaser可以手动通过register()方法注册成为一个参与方,然后通过arriveAndAwaitAdvance()表示自己已经到达,等到其它参与方一起到达后冲破栅栏。

比如下面的代码,我们对所有传入的任务进行iterations次迭代操作。
Phaser终止的条件是大于迭代次数或者没有参与方,onAdvance()返回true表示终止。
我们首先让主线程成为一个参与方,然后让每一个任务也成为参与方,在新的线程中运行任务,运行完成后到达栅栏,只要栅栏没有终止则无限循环。
在主线程上我们同样也是无限循环,每一个阶段都是等待其它线程完成任务后(到达栅栏后),自己再到达栅栏开启下一次任务。

@Slf4j
public class PhaserTest {AtomicInteger atomicInteger = new AtomicInteger();@Testpublic void test() throws InterruptedException {int iterations = 10;int tasks = 100;runTasks(IntStream.rangeClosed(1, tasks).mapToObj(index -> new Thread(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}atomicInteger.incrementAndGet();})).collect(Collectors.toList()), iterations);Assert.assertEquals(tasks * iterations, atomicInteger.get());}private void runTasks(List<Runnable> tasks, int iterations) {Phaser phaser = new Phaser() {protected boolean onAdvance(int phase, int registeredParties) {return phase >= iterations - 1 || registeredParties == 0;}};phaser.register();for (Runnable task : tasks) {phaser.register();new Thread(() -> {do {task.run();phaser.arriveAndAwaitAdvance();} while (!phaser.isTerminated());}).start();}while (!phaser.isTerminated()) {doPostOperation(phaser);phaser.arriveAndAwaitAdvance();}doPostOperation(phaser);}private void doPostOperation(Phaser phaser) {while (phaser.getArrivedParties() < 100) {try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}log.info("phase:{},registered:{},unarrived:{},arrived:{},result:{}",phaser.getPhase(),phaser.getRegisteredParties(),phaser.getUnarrivedParties(),phaser.getArrivedParties(), atomicInteger.get());}
}

10次迭代,每次迭代100个任务,执行一下看看:
image_1dgajh5dkphcg2o62q9301pvi9.png-416.5kB

可以看到,主线程的后处理任务的while循环结束后只有它自己没有到达栅栏,这个时候它可以做一些任务后处理工作,完成后冲破栅栏。

Exchanger测试

Exchanger实现的效果是两个线程在同一时间(会合点)交换数据,写一段代码测试一下。在下面的代码里,我们定义一个生产者线程不断发送数据,发送数据后休眠时间随机,通过使用Exchanger,消费者线程实现了在生产者发送数据后立刻拿到数据的效果,在这里我们并没有使用阻塞队列来实现:

@Slf4j
public class ExchangerTest {@Testpublic void test() throws InterruptedException {Random random = new Random();Exchanger<Integer> exchanger = new Exchanger<>();int count = 10;Executors.newFixedThreadPool(1, new ThreadFactoryImpl("producer")).execute(() -> {try {for (int i = 0; i < count; i++) {log.info("sent:{}", i);exchanger.exchange(i);TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}});ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactoryImpl("consumer"));executorService.execute(() -> {try {for (int i = 0; i < count; i++) {int data = exchanger.exchange(null);log.info("got:{}", data);}} catch (InterruptedException e) {e.printStackTrace();}});executorService.shutdown();executorService.awaitTermination(1, TimeUnit.HOURS);}
}

运行效果如下:
image_1dg7q8eqiffnvca166kst8pk5v.png-501.7kB

小结

并发容器这块我就不做过多总结了,ConcurrentHashMap实在是太好用太常用,但是务必注意其线程安全的特性并不是说ConcurrentHashMap怎么用都没有问题,错误使用在业务代码中很常见。

现在我们来举个看表演的例子总结一下几种并发同步器:

  • Semaphore是限制同时看表演的观众人数,有人走了后新人才能进来看
  • CountDownLatch是演职人员人不到齐表演无法开始,演完结束
  • CyclicBarrier是演职人员到期了后才能表演,最后一个到的人是导演,导演会主导整个演出,演出完毕后所有演职人员修整后重新等待大家到期
  • Phaser是每一场演出的演职人员名单可能随时会更改,但是也是要确保所有演职人员到期后才能开演

同样,代码见我的Github,欢迎clone后自己把玩,欢迎点赞。

欢迎关注我的微信公众号:随缘主人的园子

image_1dfvp8d55spm14t7erkr3mdbscf.png-45kB

转载于:https://www.cnblogs.com/lovecindywang/p/11222213.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363528.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hive:使用Apache Hive查询客户最喜欢的搜索查询和产品视图计数

这篇文章涵盖了使用Apache Hive查询存储在Hadoop下的搜索点击数据。 我们将以示例的方式生成有关总产品浏览量的客户排名靠前的搜索查询和统计信息。 继续之前的文章 使用大数据分析客户产品搜索点击次数 &#xff0c; Flume&#xff1a;使用Apache Flume收集客户产品搜索点…

expdp错误案例

转自:https://www.cnblogs.com/kerrycode/p/3960328.html Oracle数据泵(Data Dump)使用过程当中经常会遇到一些奇奇怪怪的错误案例&#xff0c;下面总结一些自己使用数据泵(Data Dump)过程当中遇到的问题以及解决方法。都是在使用过程中遇到的问题&#xff0c;以后陆续遇到数据…

HashSet源码分析:JDK源码系列

1.简介 继续分析源码&#xff0c;上一篇文章把HashMap的分析完毕。本文开始分析HashSet简单的介绍一下。 HashSet是一个无重复元素集合&#xff0c;内部使用HashMap实现&#xff0c;所以HashMap的特征耶继承了下来。存储的元素是无序的并且HashSet允许使用空的元素。 HashSet是…

修改左侧导航显示样式(转载自Sunmoonfire's artistic matrix)

这是一片非常好的文章&#xff0c;修改下CSS就可以改变左侧导航栏的样式&#xff0c;在网上找了一些都是要写代码的。怕连接失效&#xff0c;所以直接将文章考了过来&#xff0c;希望作者原谅&#xff0c;如有不妥&#xff0c;请通知一声&#xff0c;我会将文章删掉&#xff01…

tf.argmax()以及axis

tf.argmax()表示返回最大值的索引号&#xff0c;axis为0 &#xff0c;表示返回每列最大值索引号。axis为1 &#xff0c;表示返回每行最大值索引号 结果为 转载于:https://www.cnblogs.com/san333/p/10507402.html

jquery ajax 上传文件 demo,Jquery+AJAX上传文件,无刷新上传并重命名文件

index.htmlAjax上传图片Ajax上传图片function upload(){var form new FormData(document.getElementById("form"));$.ajax({url:"upload.php",type:"post",data:form,cache: false,processData: false,contentType: false,success:function(dat…

Meet Fabric8:基于Camel和ActiveMQ的开源集成平台

面料8 Fabric8是Red Hat的JBoss Fuse产品的Apache 2.0许可上游社区。 这是一个基于Apache ActiveMQ &#xff0c; Camel &#xff0c; CXF &#xff0c; Karaf &#xff0c; HawtIO等的集成平台。 它提供了自动化的配置和部署管理&#xff0c;以帮助使部署变得容易&#xff0…

Django之web框架的本质

web框架的本质及自定义web框架 我们可以这样理解&#xff1a;所有的Web应用本质上就是一个socket服务端&#xff0c;而用户的浏览器就是一个socket客户端&#xff0c;基于请求做出响应&#xff0c;客户都先请求&#xff0c;服务端做出对应的响应&#xff0c;按照http协议的请求…

Springboot 系列(十三)使用邮件服务

在我们这个时代&#xff0c;邮件服务不管是对于工作上的交流&#xff0c;还是平时的各种邮件通知&#xff0c;都是一个十分重要的存在。Java 从很早时候就可以通过 Java mail 支持邮件服务。Spring 更是对 Java mail 进行了进一步的封装&#xff0c;抽象出了 JavaMailSender. 后…

服务器能否只做c盘系统,我的云服务器只有一个c盘

我的云服务器只有一个c盘 内容精选换一换检查Pkey是否一致。查看弹性云服务器内部分配到的Pkey&#xff1a;cat /sys/class/infiniband/mlx5_0/ports/1/pkeys/* | grep -v "0x0000"检查Pkey是否一致如果环境中查出来的Pkey只有一个&#xff0c;请联系技术支持人员。如…

单例模式(C++实现)

RAII运用 只能在栈上创建对象 只能在堆上创建的对象 单例模式 设计模式 懒汉模式 解决线程安全 优化 饿汉模式 饿汉和懒汉的区别

Flume:使用Apache Flume收集客户产品搜索点击数据

这篇文章涵盖了使用Apache flume收集客户产品搜索点击并使用hadoop和elasticsearch接收器存储信息。 数据可能包含不同的产品搜索事件&#xff0c;例如基于不同方面的过滤&#xff0c;排序信息&#xff0c;分页信息&#xff0c;以及进一步查看的产品以及某些被客户标记为喜欢的…

vue-cli使用swiper4在ie以及safari报错

vue-cli项目中&#xff0c;通过npm run swiper --save-dev安装的是swiper4版本的插件&#xff0c;这样安装以后在谷歌火狐等浏览器都可以正常运行&#xff0c;但是在safari浏览器&#xff08;可能是版本太低&#xff09;还有ie&#xff08;9,10,11&#xff09;打开会报错&#…

电脑内部,小贴士:电脑内部连接标准

小贴士&#xff1a;电脑内部连接标准在介绍电脑内部连接标准之前&#xff0c;首先应该了解一下电脑内部接线的种类&#xff0c;以便分类处置。电脑内部尽管五颜六色的导线&#xff0c;其中导线的种类可以分为3 类&#xff0c;即电源线、信号线和控制线&#xff0c;而控制线又常…

太快了,太变态了:什么会影响Java中的方法调用性能?

那么这是怎么回事&#xff1f; 让我们从一个简短的故事开始。 几周前&#xff0c;我提议对Java核心libs邮件列表进行更改 &#xff0c;以覆盖当前final一些方法。 这刺激了一些讨论主题-其中之一是其中一个性能回归通过采取这是一个方法被引入的程度final免遭停止它final 。 我…

1、dubbo的概念

Dubbo是什么&#xff1f; Dubbo是阿里巴巴SOA服务化治理方案的核心框架&#xff0c;每天为2,000个服务提供3,000,000,000次访问量支持&#xff0c;并被广泛应用于阿里巴巴集团的各成员站点。Dubbo[]是一个分布式服务框架&#xff0c;致力于提供高性能和透明化的RPC远程服务调用…

轻云服务器的性能,腾讯云轻量应用服务器性能评测(以香港地域为例)

腾讯云轻量应用服务器香港节点24元/月&#xff0c;价格很不错&#xff0c;ForeignServer来说说腾讯云轻量服务器香港地域性能评测&#xff0c;包括腾讯云轻量应用服务器CPU型号配置、网络延迟速度测试&#xff1a;腾讯云香港轻量应用服务器性能评测腾讯云轻量应用服务器地域可选…

vue2.5.2版本 :MAC设置应用在127.0.0.1:80端口访问; 并将127.0.0.1指向www.yours.com ;问题“ Invalid Host header”

0.设置自己的host文件&#xff0c;将127.0.0.1指向自己想要访问的域名 127.0.0.1 www.yours.com 1.MAC设置应用在127.0.0.1&#xff1a;80端口访问&#xff1a; config/index.js目录下修改host和port 然后sudo运行npm run dev:(mac的80端口是被自身分享应用占用的&#xff0c…

Google Android 平台正式开源

Google 推出移动设备软件平台 Android 之时&#xff0c;曾向开发者开放 SDK 包&#xff0c;并许诺将在开源许可模式下开放其全部代码&#xff0c;今天&#xff0c;Google 与其合作伙伴&#xff0c;在 Open Handset Alliance 兑现了其承诺&#xff0c;用户现在可以正式下载 Andr…

JSP彩色验证码

产生验证码图片的文件-----image.jsp <% page contentType"image/jpeg" import"java.awt.*,java.awt.image.*,java.util.*,javax.imageio.*" %><%!Color getRandColor(int fc,int bc){//给定范围获得随机颜色 Random random new Random()…