内容概要
PriorityBlockingQueue
类能高效处理优先级任务,确保高优先级任务优先执行,它内部基于优先级堆实现,保证了元素的有序性,同时,作为BlockingQueue接口的实现,它提供了线程安全的队列操作,适用于多线程环境下的任务调度与资源管理,简洁而强大的API使得开发者能轻松应对复杂的并发场景。
核心概念
PriorityBlockingQueue
实现了一个线程安全的优先级队列,在这个队列中,元素根据它们的自然排序(如果它们实现了 Comparable
接口)或者传递给队列构造器的 Comparator
进行排序。
比如有一个打印服务,在这个系统中,用户可以提交打印任务,每个任务都有一个优先级,高优先级的任务(比如紧急的文档)应该比低优先级的任务(如日常报告)更快地被处理。
在这个场景中,PriorityBlockingQueue
可以用来存储和管理待处理的打印任务,每当一个新的打印任务被提交时,它就被添加到队列中,由于 PriorityBlockingQueue
是一个优先级队列,所以高优先级的任务会自动排在队列的前面。
打印服务的工作线程可以从队列中取出任务来处理,由于队列是线程安全的,多个工作线程可以同时从队列中安全地取出任务,而且,由于队列会根据优先级对任务进行排序,所以工作线程总是先处理优先级最高的任务。
PriorityBlockingQueue
主要解决以下类似场景的问题:
- 并发访问:在多线程环境中,
PriorityBlockingQueue
提供了安全的并发访问机制,多个线程可以同时向队列中添加或检索元素,而无需担心数据的不一致性或损坏。 - 优先级排序:队列中的元素根据其自然排序顺序或者传递给队列构造函数的比较器(
Comparator
)来排序,这使得在处理任务或消息时,可以确保首先处理优先级最高的项。 - 资源分配:在资源有限的情况下,
PriorityBlockingQueue
可以帮助确定哪些任务或请求应该首先获得资源,通过为不同的任务设置不同的优先级,系统可以优先处理更重要的任务。 - 任务调度:在任务调度系统中,
PriorityBlockingQueue
可用于管理待执行的任务,工作线程可以从队列中检索并执行优先级最高的任务,从而确保任务按照优先级顺序执行。 - 缓冲和流量控制:在高负载情况下,
PriorityBlockingQueue
可以作为缓冲区来存储待处理的项目,并通过其阻塞特性来控制流量,当队列满时,尝试添加元素的线程将被阻塞,直到队列中有可用空间;同样地,当队列为空时,尝试检索元素的线程也将被阻塞,直到有元素可用。 - 延迟执行:虽然
PriorityBlockingQueue
本身不直接支持延迟执行,但可以通过结合使用优先级和自定义的比较器来实现类似的效果,例如,可以将任务的执行时间作为优先级的一部分,并确保在执行时间之前任务不会被检索出来。
代码案例
下面是一个简单例子,演示如何使用PriorityBlockingQueue
类,这个例子中,创建了一个优先级阻塞队列,用于存储和检索Task
对象,这些对象根据它们的优先级进行排序,client代码会向队列中添加任务,并从队列中检索并处理优先级最高的任务,如下:
import java.util.concurrent.PriorityBlockingQueue; // 任务类,实现了Comparable接口以便能够根据优先级进行排序
class Task implements Comparable<Task> { private int priority; private String description; public Task(int priority, String description) { this.priority = priority; this.description = description; } public int getPriority() { return priority; } public String getDescription() { return description; } // 根据优先级对任务进行排序,优先级高的任务排在前面 @Override public int compareTo(Task other) { // 注意:这里使用Integer.compare进行比较,以正确处理负数优先级 return Integer.compare(other.priority, this.priority); // 降序排列 } @Override public String toString() { return "Task{" + "priority=" + priority + ", description='" + description + "'}"; }
} // 客户端代码,演示如何使用PriorityBlockingQueue
public class PriorityBlockingQueueExample { public static void main(String[] args) throws InterruptedException { // 创建一个优先级阻塞队列 PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(); // 向队列中添加任务 queue.put(new Task(3, "Low priority task")); queue.put(new Task(5, "Medium priority task")); queue.put(new Task(1, "Very low priority task")); queue.put(new Task(7, "High priority task")); // 从队列中检索并处理任务,直到队列为空 while (!queue.isEmpty()) { // take方法会阻塞,直到队列中有元素可用 Task task = queue.take(); System.out.println("Processing task: " + task); } }
}
在上面的代码中,创建了一个PriorityBlockingQueue
实例,并向其中添加了四个具有不同优先级的任务,然后,使用一个循环从队列中检索并处理任务,直到队列为空,由于PriorityBlockingQueue
是一个优先级队列,因此当从队列中检索任务时,优先级最高的任务总是首先被取出。
在compareTo
方法中,使用Integer.compare(other.priority, this.priority)
来对任务进行降序排列,因此,优先级数值越高的任务将被视为优先级越高,并排在队列的前面,如果想要升序排列(即优先级数值越低的任务排在前面),可以简单地调换other.priority
和this.priority
的位置。
上面代码输出如下类似内容:
Processing task: Task{priority=7, description='High priority task'}
Processing task: Task{priority=5, description='Medium priority task'}
Processing task: Task{priority=3, description='Low priority task'}
Processing task: Task{priority=1, description='Very low priority task'}
核心API
PriorityBlockingQueue
实现了 BlockingQueue
接口并使用优先级堆对元素进行排序,以下是 PriorityBlockingQueue
类中一些常用方法的含义:
1、构造方法
PriorityBlockingQueue()
: 创建一个具有默认初始容量的PriorityBlockingQueue
。PriorityBlockingQueue(int initialCapacity)
: 创建一个具有指定初始容量的PriorityBlockingQueue
。PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
: 创建一个具有指定初始容量,并根据指定的比较器对元素进行排序的PriorityBlockingQueue
。
2、插入方法
add(E e)
: 将指定的元素插入到此队列中,如果成功,则返回true
(由于队列没有容量限制,因此该方法总是返回true
,除非元素为null
)。offer(E e)
: 将指定的元素插入到此队列中,并立即返回。该方法等效于add(E e)
。put(E e) throws InterruptedException
: 将指定的元素插入到此队列中,等待必要的空间变得可用,如果当前线程被中断,则抛出InterruptedException
。
3、移除方法
remove(Object o)
: 移除队列中首次出现的指定元素(如果存在)。poll()
: 检索并移除此队列的头,如果此队列为空,则返回null
。take() throws InterruptedException
: 检索并移除此队列的头,等待元素变得可用,如果当前线程被中断,则抛出InterruptedException
。
4、检查方法
peek()
: 检索,但不移除此队列的头,如果此队列为空,则返回null
。element() throws NoSuchElementException
: 检索,但不移除此队列的头,如果此队列为空,则抛出NoSuchElementException
。
5、其他方法
size()
: 返回队列中的元素数量。isEmpty()
: 如果队列为空,则返回true
。clear()
: 移除此队列中的所有元素。contains(Object o)
: 如果队列包含指定的元素,则返回true
。remainingCapacity()
: 由于PriorityBlockingQueue
没有容量限制,此方法始终返回Integer.MAX_VALUE
。drainTo(Collection<? super E> c)
: 移除此队列中所有可用的元素,并将它们添加到给定的集合中。drainTo(Collection<? super E> c, int maxElements)
: 移除此队列中最多给定数量的可用元素,并将它们添加到给定的集合中。toArray()
: 返回包含队列中所有元素的数组。iterator()
: 返回在此队列元素上进行迭代的迭代器。注意,由于并发修改的可能性,迭代器的行为是弱一致的。comparator()
: 返回用于对此队列中的元素进行排序的比较器,如果此队列按其自然顺序排序,则返回null
。
核心总结
PriorityBlockingQueue
类允许开发者存储元素并根据其自然排序或者提供的Comparator
进行排序,其优点在于它能高效地处理需要优先级调度的任务,确保最高优先级的任务总是优先被处理,它的缺点是在高并发场景下,由于线程间的竞争,性能可能会受到影响,此外,虽然它提供了并发安全性,但在迭代过程中并不保证元素的顺序一致性。
END!
往期回顾
Java并发基础:DelayQueue全面解析!
Java并发基础:LinkedBlockingDeque全面解析!
Java并发基础:LinkedTransferQueue全面解析!
Java并发基础:LinkedBlockingQueue全面解析!
Java并发基础:Deque接口和Queue接口的区别?