堵塞队列
先了解一下生产者消费者模式:
生产者就是生产数据的一方,消费者就是消费数据的另一方。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。
生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题,即有了MQ(Message Queue)中间件。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接推送给阻塞队列,消费者直接从阻塞队列里获取数据。
基本概念:
1)、当队列满的时候,插入元素的线程被阻塞,直达队列不满。
2)、队列为空的时候,获取元素的线程被阻塞,直到队列不空。
这种模式最常见的就是在MQ里面,即消息队列(Message Queue)框架,这里就不多谈了。
BlockQueue<T>
堵塞队列都是基于这个接口实现的
基本接口定义,个人觉得如果用堵塞队列的话就应该推荐使用堵塞方法,即put()以及take()
方法 | 抛出异常 | 带有返回值 | 堵塞 | 超时退出 |
插入方法 | add | offer | put | offer(Time) |
删除方法 | remove | poll | take | poll(Time) |
判断是否存在 | element | peek | N/A | N/A |
1)、抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
2)、返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
3)、一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
4)、超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
堵塞队列分有界和无界队列
有界:会有设定的队列大小,不能无限制的创建队列大小
无界:指理论上可以无限队列的大小,但是实际情况,每个服务器都会有大小,只能说是理论上的,实际应用的时候会容易撑满磁盘或内存,建议使用有界堵塞队列
常用的堵塞队列有一下几种:
ArrayBlockingQueue:一个由数组组成的有界堵塞队列,是一个FIFO队列,不过要求创建对象的时候创建大小,内部是由ReentrantLock和Condition实现堵塞策略,单个锁
/** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;
LinkedBlockingQueue:由链表结构组成的有界堵塞队列,也是一个FIFO队列,不要求创建对象是设置默认大小,不设置为Integer.MAX_VAUE;有两个锁,一个用于插入元素,另外一个用于获取元素
/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。默认情况下,按照自然顺序,要么实现compareTo()方法,指定构造参数Comparator。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。支持延时获取的元素的阻塞队列,即可以延时推送,元素必须要实现Delayed接口。
SynchronousQueue:一个不存储元素的阻塞队列,所以在执行速度上会比其他堵塞队列要快,每一个put操作都要等待一个take操作;
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。可以从队列的头和尾都可以插入和移除元素,实现工作密取,方法名带了First对头部操作,带了last从尾部操作。
跳表(SkipList)
是一种加快查询链表速度的一种方法,以空间换时间的一种方法,代表的类有ConcurrentSkipListMap和ConcurrentSkipListSet,也是一种随机概率数据结构;
在原有的链表结构(链表是按顺序排序的)上加上一层链表结构,但这个是随机指定的,类似于数据库的索引,可加多层索引链表查询的时间复杂度为O(logn),快跟上了红黑树的查询速度,下面为图解示例:
** Head nodes Index nodes* +-+ right +-+ +-+* |2|---------------->| |--------------------->| |->null* +-+ +-+ +-+* | down | |* v v v* +-+ +-+ +-+ +-+ +-+ +-+* |1|----------->| |->| |------>| |----------->| |------>| |->null* +-+ +-+ +-+ +-+ +-+ +-+* v | | | | |* Nodes next v v v v v* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+* | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+*
写时复制容器
通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以写时复制容器也是一种读写分离的思想,读和写不同的容器。如果读的时候有多个线程正在向容器添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的,只能保证最终一致性。
常见的有CopyOnWriteArrayList以及CopyOnWriteArraySet;
适用读多写少的并发场景,常见应用:白名单/黑名单, 商品类目的访问和更新场景。
以下为图解,在写完后,原本的引用会重新指向新的数组对象,所以就会存在内存占用问题。