线程安全的集合
文章目录
- 线程安全的集合
- 1 List 和 Set体系
- Collections中的工具方法
- 1.1 CopyOnWriteArrayList
- 1.2 CopyOnWriteArraySet
- 1.3 ConcurrentHashMap
- 2 CAS算法
- 3 Queue接口(队列)
- 3.1 ConcurrentLinkedQueue
- 3.2 BlockingQueue接口(阻塞队列)
Collection体系集合、以及线程安全集合。
注:下划线代表线程安全集合
1 List 和 Set体系
Collections中的工具方法
Collections工具类中提供了多个可以获得线程安全集合的方法。
- public static Collection synchronizedCollection(Collection c)
- public static List synchronizedList(List list)
- public static Set synchronizedSet(Set s)
- public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
- public static SortedSet synchronizedSortedSet(SortedSet s)
- public static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m)
JDK1.2提供,接口统一、维护性高,但性能没有提升,均以synchonized实现。
1.1 CopyOnWriteArrayList
- 线程安全的ArrayList,加强版读写分离。
- 写有锁,读无锁,读写之间不阻塞,优于读写锁。
- 写入时,先copy一个容器副本、再添加新元素,最后替换引用。
- 使用方式与ArrayList无异。
1.2 CopyOnWriteArraySet
- 线程安全的Set,底层使用CopyOnWriteArrayList实现。
- 唯一不同在于,使用addIfAbsent()添加元素,会遍历数组
- 如存在元素,则不添加(扔掉副本)。
1.3 ConcurrentHashMap
JDK 1.7
- 初始容量默认为16段(Segment),使用分段锁设计。
- 不对整个Map加锁,而是为每个Segment加锁。
- 当多个对象存入同一个Segment时,才需要互斥。
- 最理想状态为16个对象分别存入16个Segment,并行数量16。
- 使用方式与HashMap无异。
JDK 1.8
- 改为CAS无锁算法。
eg:
public class TestThreadSafe {public static void main(String[] args) {//ArrayList<String> list=new ArrayList<>();//CopyOnWriteArrayList list=new CopyOnWriteArrayList();//CopyOnWriteArraySet set=new CopyOnWriteArraySet();ConcurrentHashMap<String,String> hashMap=new ConcurrentHashMap<>();ExecutorService es = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {es.submit(new Runnable() {@Overridepublic void run() {for (int j = 0; j < 10; j++) {try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}hashMap.put(Thread.currentThread().getName()+"..."+j,"xxx");}}});}es.shutdown();while(!es.isTerminated());System.out.println("元素个数:"+hashMap.size());}
}
2 CAS算法
CAS:Compare And Swap(比较交换算法)
- 其实现方式是基于硬件平台的汇编指令,是靠硬件来实现的,效率高。
- 并且比较和交换过程是同步的。
- CAS是一种乐观锁。
乐观锁:
- 总是认为是线程安全,不怕别的线程修改变量,如果修改了再重新尝试,直到成功。
- CAS是乐观锁。
悲观锁:
- 总是认为线程不安全,不管什么情况都进行加锁,要是获取锁失败,就阻塞。
- synchronized、ReentrantLock是悲观锁。
CAS比较交换算法,修改的方法包含三个核心参数(V,E,N)
- V:要更新的变量、E:预期值、N:新值。
- 只有当V==E时,V=N;否则表示已被更新过,则取消当前操作,继续判断直到成功。
eg:
使用代码模拟CAS算法
public class TestCAS {public static void main(String[] args) {Cas cas=new Cas();ExecutorService es = Executors.newFixedThreadPool(100);for (int i = 0; i < 100; i++) {es.submit(() -> {while(true) {int old = cas.getV();boolean b = cas.compareAndSwap(old, new Random().nextInt(100));System.out.println(Thread.currentThread().getName()+"..."+b);if(b){break;}}});}es.shutdown();}static class Cas{private int V;//更新的变量//获取V的值public int getV(){return V;}public synchronized boolean compareAndSwap(int E,int N){if(E==V){V=N;return true;}return false;}}
}
3 Queue接口(队列)
Collection的子接口,表示队列FIFO(First In First Out)
常用方法:
- 抛出异常:
- boolean add(E e) //顺序添加一个元素(到达上限后,再添加则会抛出异常)
- E remove() //获得第一个元素并移除(如果队列没有元素时,则抛异常)
- E element() //获得第一个元素但不移除(如果队列没有元素时,则抛异常)
- 返回特殊值:推荐使用
- boolean offer(E e) //顺序添加一个元素 (到达上限后,再添加则会返回false)
- E poll() //获得第一个元素并移除 (如果队列没有元素时,则返回null)
- E peek() //获得第一个元素但不移除 (如果队列没有元素时,则返回null)
3.1 ConcurrentLinkedQueue
线程安全、可高效读写的队列,高并发下性能最好的队列。
- 采用CAS比较交换算法
eg:
public class TestQueue {public static void main(String[] args) {
// Queue<String> queue=new LinkedList<>();Queue<String> queue=new ConcurrentLinkedQueue<>();//入队
// queue.offer("aaa");
// queue.offer("bbb");
// queue.offer("ccc");ExecutorService es = Executors.newFixedThreadPool(3);for (int i = 0; i < 3; i++) {es.submit(new Runnable() {@Overridepublic void run() {queue.offer(Thread.currentThread().getName());}});}//出队es.shutdown();while(!es.isTerminated());int count=queue.size();for (int i = 0; i < count; i++) {System.out.println(queue.poll());}}
}
3.2 BlockingQueue接口(阻塞队列)
Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法。
方法:
- void put(E e) //将指定元素插入此队列中,如果没有可用空间,则等待。
- E take() //获取并移除此队列头部元素,如果没有可用元素,则等待。
- 可用于解决生产者、消费者问题。
实现类:
- ArrayBlockingQueue:数组结构实现,有界队列。(手工固定上限)
- LinkedBlockingQueue:链表结构实现,有界队列。(默认上限Integer.MAX_VALUE)
eg:
package StageOne.day21.demo02;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;/*** @author 胡昊龙* @version 1.0* @description: TODO* @date 2024/1/16 14:32*/
public class TestBlockingQueue {public static void main(String[] args) {//创建阻塞队列BlockingQueue<String> queue = new LinkedBlockingQueue<>(6);//创建线程池ExecutorService es = Executors.newCachedThreadPool();//提交任务es.submit(()->{for (int i = 0; i < 30; i++) {try {queue.put("面包"+i);System.out.println("生产了"+i);} catch (InterruptedException e) {throw new RuntimeException(e);}}});es.submit(()->{for (int i = 0; i < 30; i++) {try {String take = queue.take();System.out.println("消费了"+take);} catch (InterruptedException e) {throw new RuntimeException(e);}}});es.shutdown();}
}