1. 写在前面
ConcurrentLinkedDeque 是 Java 中一个高效、线程安全的双端队列(Deque),使用无锁算法(CAS 操作)来保证线程安全性。由于其复杂的实现和广泛的应用场景,它常常成为面试中的重点考察对象。不知道下面几个问题你在面试过程中有没有被问到过?
- ConcurrentLinkedDeque与ConcurrentLinkedQueue有什么区别?
- ConcurrentLinkedDeque 使用了什么技术来保证线程安全?
- 如何实现 ConcurrentLinkedDeque 的无锁插入操作?
- ConcurrentLinkedDeque 如何保证队列的一致性?
- ConcurrentLinkedDeque 的 pollFirst 方法的实现逻辑是什么?
- ConcurrentLinkedDeque 是否支持 null 元素?
- 在什么场景下使用 ConcurrentLinkedDeque 比较合适?
2.从使用说起
2.1 基本操作
ConcurrentLinkedDeque 提供了丰富的基本操作,包括在队列两端进行插入、删除和查看操作。以下是一些基本操作的示例:
import java.util.concurrent.ConcurrentLinkedDeque;public class BasicOperations {public static void main(String[] args) {ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();// 在队列头部插入元素deque.addFirst("A");deque.addFirst("B");// 在队列尾部插入元素deque.addLast("C");deque.addLast("D");// 查看头部和尾部元素System.out.println("Head: " + deque.peekFirst()); // BSystem.out.println("Tail: " + deque.peekLast()); // D// 从队列头部移除元素System.out.println("Removed from head: " + deque.pollFirst()); // B// 从队列尾部移除元素System.out.println("Removed from tail: " + deque.pollLast()); // D}
}
2.2 并发生产者-消费者模型
ConcurrentLinkedDeque 非常适合用于生产者-消费者模型,其中多个生产者线程向队列添加任务,多个消费者线程从队列中取出任务进行处理。
import java.util.concurrent.ConcurrentLinkedDeque;public class ProducerConsumer {private static ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();public static void main(String[] args) {// 启动生产者线程Thread producer1 = new Thread(new Producer(), "Producer-1");Thread producer2 = new Thread(new Producer(), "Producer-2");// 启动消费者线程Thread consumer1 = new Thread(new Consumer(), "Consumer-1");Thread consumer2 = new Thread(new Consumer(), "Consumer-2");producer1.start();producer2.start();consumer1.start();consumer2.start();}static class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {String item = Thread.currentThread().getName() + "-Item-" + i;deque.addLast(item);System.out.println(Thread.currentThread().getName() + " produced " + item);try {Thread.sleep(100); // 模拟生产时间} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}static class Consumer implements Runnable {@Overridepublic void run() {while (true) {String item = deque.pollFirst();if (item != null) {System.out.println(Thread.currentThread().getName() + " consumed " + item);} else {try {Thread.sleep(50); // 队列为空时等待} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}}
}
2.3 双端队列的应用:双向搜索
双端队列可以用于实现双向搜索算法,例如在图搜索中同时从起点和终点进行搜索,直到搜索路径相交。
import java.util.concurrent.ConcurrentLinkedDeque;public class BidirectionalSearch {public static void main(String[] args) {ConcurrentLinkedDeque<Integer> deque = new ConcurrentLinkedDeque<>();// 初始化搜索队列deque.addFirst(0); // 起点deque.addLast(100); // 终点// 模拟双向搜索while (!deque.isEmpty()) {// 从头部进行搜索Integer head = deque.pollFirst();if (head != null) {System.out.println("Searching from start: " + head);// 假设找到目标if (head == 50) break;// 向队列中添加新的搜索节点deque.addLast(head + 1);}// 从尾部进行搜索Integer tail = deque.pollLast();if (tail != null) {System.out.println("Searching from end: " + tail);// 假设找到目标if (tail == 50) break;// 向队列中添加新的搜索节点deque.addFirst(tail - 1);}}}
}
2.4 任务调度
ConcurrentLinkedDeque 可以用于任务调度系统,支持任务的动态添加和优先级调整。
import java.util.concurrent.ConcurrentLinkedDeque;public class TaskScheduler {private static ConcurrentLinkedDeque<String> taskQueue = new ConcurrentLinkedDeque<>();public static void main(String[] args) {// 添加普通任务taskQueue.addLast("Task1");taskQueue.addLast("Task2");// 添加高优先级任务taskQueue.addFirst("HighPriorityTask");// 处理任务while (!taskQueue.isEmpty()) {String task = taskQueue.pollFirst();System.out.println("Processing " + task);}}
}
2.5 线程安全的双端队列
在多线程环境中,ConcurrentLinkedDeque 可以作为一个线程安全的双端队列,用于存储和处理数据
import java.util.concurrent.ConcurrentLinkedDeque;public class ThreadSafeDeque {private static ConcurrentLinkedDeque<Integer> deque = new ConcurrentLinkedDeque<>();public static void main(String[] args) {// 启动多个线程进行操作Thread t1 = new Thread(() -> {for (int i = 0; i < 5; i++) {deque.addFirst(i);System.out.println("Thread 1 added " + i + " to the front");}});Thread t2 = new Thread(() -> {for (int i = 5; i < 10; i++) {deque.addLast(i);System.out.println("Thread 2 added " + i + " to the end");}});Thread t3 = new Thread(() -> {while (!deque.isEmpty()) {Integer item = deque.pollFirst();if (item != null) {System.out.println("Thread 3 removed " + item + " from the front");}}});t1.start();t2.start();t3.start();}
}
3. addFirst(E e)底层实现原理
下面这段代码是 ConcurrentLinkedDeque 中用于在队列头部插入元素的方法实现,下面我们逐行分析这段代码,解释其工作原理。
public void addFirst(E e) {linkFirst(e);}
private void linkFirst(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);restartFromHead:for (;;)for (Node<E> h = head, p = h, q;;) {if ((q = p.prev) != null &&(q = (p = q).prev) != null)// Check for head updates every other hop.// If p == q, we are sure to follow head instead.p = (h != (h = head)) ? h : q;else if (p.next == p) // PREV_TERMINATORcontinue restartFromHead;else {// p is first nodenewNode.lazySetNext(p); // CAS piggybackif (p.casPrev(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != h) // hop two nodes at a timecasHead(h, newNode); // Failure is OK.return;}// Lost CAS race to another thread; re-read prev}}}
3.1 方法签名
public void addFirst(E e) {linkFirst(e);
}
addFirst(E e) 方法是公开的接口,用于在队列头部插入元素。它调用了私有方法 linkFirst(E e) 来完成实际的插入操作。
3.2 私有方法 linkFirst
private void linkFirst(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);
- checkNotNull(e):检查传入的元素是否为 null,如果是 null 则抛出 NullPointerException。
- final Node newNode = new Node(e):创建一个新的节点 newNode,其值为 e。
3.3 循环和节点遍历
restartFromHead:for (;;)for (Node<E> h = head, p = h, q;;) {if ((q = p.prev) != null &&(q = (p = q).prev) != null)// Check for head updates every other hop.// If p == q, we are sure to follow head instead.p = (h != (h = head)) ? h : q;else if (p.next == p) // PREV_TERMINATORcontinue restartFromHead;else {// p is first nodenewNode.lazySetNext(p); // CAS piggybackif (p.casPrev(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != h) // hop two nodes at a timecasHead(h, newNode); // Failure is OK.return;}// Lost CAS race to another thread; re-read prev}}
}
- restartFromHead::标签,用于在需要时重新从头开始遍历。
- for (;😉:无限循环,直到插入操作成功。
- for (Node h = head, p = h, q;😉:内部循环,用于遍历节点链表。
- if ((q = p.prev) != null && (q = (p = q).prev) != null):尝试向前移动两个节点。如果 p.prev 和 p.prev.prev 都不为 null,则更新 p 和 q。
- p = (h != (h = head)) ? h : q:每隔两个节点检查一次头节点 head,如果头节点更新了,则重新设置 p。
- else if (p.next == p) // PREV_TERMINATOR:检查是否遇到终止节点(PREV_TERMINATOR),如果是,则重新从头开始遍历。
- else:找到了头节点,可以插入新节点。
- newNode.lazySetNext§:设置新节点的 next 指针指向当前节点 p
- if (p.casPrev(null, newNode)):使用 CAS 操作将当前节点 p 的 prev 指针从 null 设置为新节点 newNode。如果成功,表示新节点成功插入到队列头部。
- if (p != h) // hop two nodes at a time:如果当前节点 p 不是头节点 h,则尝试更新头节点。
- casHead(h, newNode):使用 CAS 操作更新头节点为新节点 newNode。
- return:插入成功,返回。
- // Lost CAS race to another thread; re-read prev:如果 CAS 操作失败,表示有其他线程同时插入了节点,重新读取 prev 指针并重试。
4. addLast(E e)底层实现原理
下面这段代码是 ConcurrentLinkedDeque 中用于在队列尾部插入元素的方法实现。下面我们逐行分析这段代码,解释其工作原理。
public void addLast(E e) {linkLast(e);}
private void linkLast(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);restartFromTail:for (;;)for (Node<E> t = tail, p = t, q;;) {if ((q = p.next) != null &&(q = (p = q).next) != null)// Check for tail updates every other hop.// If p == q, we are sure to follow tail instead.p = (t != (t = tail)) ? t : q;else if (p.prev == p) // NEXT_TERMINATORcontinue restartFromTail;else {// p is last nodenewNode.lazySetPrev(p); // CAS piggybackif (p.casNext(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.return;}// Lost CAS race to another thread; re-read next}}}
4.1 方法签名
public void addLast(E e) {linkLast(e);
}
addLast(E e) 方法是公开的接口,用于在队列尾部插入元素。它调用了私有方法 linkLast(E e) 来完成实际的插入操作。
4.2 私有方法 linkLast
private void linkLast(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);
- checkNotNull(e):检查传入的元素是否为 null,如果是 null 则抛出 NullPointerException。
- final Node newNode = new Node(e):创建一个新的节点 newNode,其值为 e。
4.3 循环和节点遍历
restartFromTail:for (;;)for (Node<E> t = tail, p = t, q;;) {if ((q = p.next) != null &&(q = (p = q).next) != null)// Check for tail updates every other hop.// If p == q, we are sure to follow tail instead.p = (t != (t = tail)) ? t : q;else if (p.prev == p) // NEXT_TERMINATORcontinue restartFromTail;else {// p is last nodenewNode.lazySetPrev(p); // CAS piggybackif (p.casNext(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.return;}// Lost CAS race to another thread; re-read next}}
}
- restartFromTail::标签,用于在需要时重新从尾部开始遍历。
- for (;😉:无限循环,直到插入操作成功。
- for (Node t = tail, p = t, q;😉:内部循环,用于遍历节点链表。
- if ((q = p.next) != null && (q = (p = q).next) != null):尝试向后移动两个节点。如果 p.next 和 p.next.next 都不为 null,则更新 p 和 q。
- p = (t != (t = tail)) ? t : q:每隔两个节点检查一次尾节点 tail,如果尾节点更新了,则重新设置 p。
- else if (p.prev == p) // NEXT_TERMINATOR:检查是否遇到终止节点(NEXT_TERMINATOR),如果是,则重新从尾部开始遍历。
- else:找到了尾节点,可以插入新节点。
- newNode.lazySetPrev§:设置新节点的 prev 指针指向当前节点 p。
- if (p.casNext(null, newNode)):使用 CAS 操作将当前节点 p 的 next 指针从 null 设置为新节点 newNode。如果成功,表示新节点成功插入到队列尾部。
- if (p != t) // hop two nodes at a time:如果当前节点 p 不是尾节点 t,则尝试更新尾节点。
- casTail(t, newNode):使用 CAS 操作更新尾节点为新节点 newNode。
- return:插入成功,返回。
- Lost CAS race to another thread; re-read next:如果 CAS 操作失败,表示有其他线程同时插入了节点,重新读取 next 指针并重试。
5. pollFirst()的底层实现原理
下面这段代码是 ConcurrentLinkedDeque 中用于从队列头部移除并返回第一个元素的方法实现。面我们逐行分析这段代码,解释其工作原理。
5.1 方法签名
public E pollFirst() {
pollFirst() 方法是公开的接口,用于从队列头部移除并返回第一个元素。如果队列为空,则返回 null。
5.2 方法实现
for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;if (item != null && p.casItem(item, null)) {unlink(p);return item;}}return null;
}
- for (Node p = first(); p != null; p = succ§):循环遍历队列中的节点。
- Node p = first():获取队列的第一个节点。
- p != null:如果当前节点 p 不为 null,则继续循环。
- p = succ§:将当前节点 p 更新为其后继节点。
- E item = p.item:获取当前节点 p 的元素值 item。
- if (item != null && p.casItem(item, null)):检查当前节点的元素值是否不为 null,并尝试使用 CAS 操作将当前节点的元素值从 item 设置为 null。
- item != null:确保当前节点的元素值不为 null。
- p.casItem(item, null):使用 CAS 操作将当前节点的元素值从 item 设置为 null。如果成功,表示当前节点的元素值被成功移除。
- unlink§:调用 unlink§ 方法,将当前节点 p 从链表中解除链接。
- unlink§ 方法的作用是将当前节点从链表中移除,具体实现可能包括更新前驱节点和后继节点的指针,以确保链表的完整性。
- return item:返回被移除的元素值 item。
- return null:如果循环结束且没有找到非空的节点,则返回 null,表示队列为空或者没有可移除的元素。
系列文章
1.JDK源码阅读之环境搭建
2.JDK源码阅读之目录介绍
3.jdk源码阅读之ArrayList(上)
4.jdk源码阅读之ArrayList(下)
5.jdk源码阅读之HashMap
6.jdk源码阅读之HashMap(下)
7.jdk源码阅读之ConcurrentHashMap(上)
8.jdk源码阅读之ConcurrentHashMap(下)
9.jdk源码阅读之ThreadLocal
10.jdk源码阅读之ReentrantLock
11.jdk源码阅读之CountDownLatch
12.jdk源码阅读之CyclicBarrier
13.jdk源码阅读之Semaphore
14.jdk源码阅读之线程池(上)
15.jdk源码阅读之线程池(下)
16.jdk源码阅读之ArrayBlockingQueue
17.jdk源码阅读之LinkedBlockingQueue
18.jdk源码阅读之CopyOnWriteArrayList
19.jdk源码阅读之FutureTask
20.jdk源码阅读之CompletableFuture
21.jdk源码阅读之AtomicLong
22.jdk源码阅读之Thread(上)
23.jdk源码阅读之Thread(下)
24.jdk源码阅读之ExecutorService
25.jdk源码阅读之Executors
26.jdk源码阅读之ConcurrentLinkedQueue