网站在广告法之前做的/黄冈网站推广优化找哪家

网站在广告法之前做的,黄冈网站推广优化找哪家,土木工程网官网登录,网站开发人员配置大纲 1.并发安全的数组列表CopyOnWriteArrayList 2.并发安全的链表队列ConcurrentLinkedQueue 3.并发编程中的阻塞队列概述 4.JUC的各种阻塞队列介绍 5.LinkedBlockingQueue的具体实现原理 6.基于两个队列实现的集群同步机制 4.JUC的各种阻塞队列介绍 (1)基于数组的阻塞…

大纲

1.并发安全的数组列表CopyOnWriteArrayList

2.并发安全的链表队列ConcurrentLinkedQueue

3.并发编程中的阻塞队列概述

4.JUC的各种阻塞队列介绍

5.LinkedBlockingQueue的具体实现原理

6.基于两个队列实现的集群同步机制

4.JUC的各种阻塞队列介绍

(1)基于数组的阻塞队列ArrayBlockingQueue

(2)基于链表的阻塞队列LinkedBlockingQueue

(3)优先级阻塞队列PriorityBlockingQueue

(4)延迟阻塞队列DelayQueue

(5)无存储结构的阻塞队列SynchronousQueue

(6)阻塞队列结合体LinkedTransferQueue

(7)双向阻塞队列LinkedBlockingDeque

(1)基于数组的阻塞队列ArrayBlockingQueue

ArrayBlockingQueue是一个基于数组实现的阻塞队列。其构造方法可以指定:数组的长度、公平还是非公平、数组的初始集合。

ArrayBlockingQueue会通过ReentrantLock来解决线程竞争的问题,以及采用Condition来解决线程的唤醒与阻塞的问题。

//A bounded BlockingQueue backed by an array.  
//This queue orders elements FIFO (first-in-first-out).  
//The head of the queue is that element that has been on the queue the longest time.  
//The tail of the queue is that element that has been on the queue the shortest time. 
//New elements are inserted at the tail of the queue, 
//and the queue retrieval operations obtain elements at the head of the queue.
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //The queued itemsfinal Object[] items;//items index for next take, poll, peek or removeint takeIndex;//items index for next put, offer, or addint putIndex;//Number of elements in the queueint count;//Main lock guarding all accessfinal ReentrantLock lock;//Condition for waiting takesprivate final Condition notEmpty;//Condition for waiting putsprivate final Condition notFull;//Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.public ArrayBlockingQueue(int capacity) {this(capacity, false);}//Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0) {throw new IllegalArgumentException();}this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}//Inserts the specified element at the tail of this queue, //waiting for space to become available if the queue is full.public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {notFull.await();}enqueue(e);} finally {lock.unlock();}}//Inserts element at current put position, advances, and signals.//Call only when holding lock.private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length) {putIndex = 0;}count++;notEmpty.signal();}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {notEmpty.await();}return dequeue();} finally {lock.unlock();}}//Returns the number of elements in this queue.public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}...
}

(2)基于链表的阻塞队列LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,它可以不指定阻塞队列的长度,它的默认长度是Integer.MAX_VALUE。由于这个默认长度非常大,一般也称LinkedBlockingQueue为无界队列。

//An optionally-bounded BlockingQueue based on linked nodes.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time. 
//New elements are inserted at the tail of the queue, 
//and the queue retrieval operations obtain elements at the head of the queue.
//Linked queues typically have higher throughput than array-based queues 
//but less predictable performance in most concurrent applications.
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...//The capacity bound, or Integer.MAX_VALUE if noneprivate final int capacity;//Current number of elementsprivate final AtomicInteger count = new AtomicInteger();//Head of linked list.transient Node<E> head;//Tail of linked list.private transient Node<E> last;//Lock held by take, poll, etcprivate final ReentrantLock takeLock = new ReentrantLock();//Lock held by put, offer, etcprivate final ReentrantLock putLock = new ReentrantLock();//Wait queue for waiting takesprivate final Condition notEmpty = takeLock.newCondition();//Wait queue for waiting putsprivate final Condition notFull = putLock.newCondition();//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}public int size() {return count.get();}...
}

(3)优先级阻塞队列PriorityBlockingQueue

PriorityBlockingQueue是一个支持自定义元素优先级的无界阻塞队列。默认情况下添加的元素采用自然顺序升序排列,当然可以通过实现元素的compareTo()方法自定义优先级规则。

PriorityBlockingQueue是基于数组实现的,这个数组会自动进行动态扩容。在应用方面,消息中间件可以基于优先级阻塞队列来实现。

//An unbounded BlockingQueue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.  
//While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 
//This class does not permit null elements.
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, //if comparator is null: For each node n in the heap and each descendant d of n, n <= d.//The element with the lowest value is in queue[0], assuming the queue is nonempty.private transient Object[] queue;//The number of elements in the priority queue.private transient int size;//The comparator, or null if priority queue uses elements' natural ordering.private transient Comparator<? super E> comparator;//Lock used for all public operationsprivate final ReentrantLock lock;//Condition for blocking when emptyprivate final Condition notEmpty;//Spinlock for allocation, acquired via CAS.private transient volatile int allocationSpinLock;//Creates a PriorityBlockingQueue with the default initial capacity (11) that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that orders its elements according to the specified comparator.public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {if (initialCapacity < 1) {throw new IllegalArgumentException();}this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never block.public void put(E e) {offer(e); // never need to block}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never return false.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length)) {tryGrow(array, cap);}try {Comparator<? super E> cmp = comparator;if (cmp == null) {siftUpComparable(n, e, array);} else {siftUpUsingComparator(n, e, array, cmp);}size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}//Tries to grow array to accommodate at least one more element (but normally expand by about 50%), //giving up (allowing retry) on contention (which we expect to be rare). Call only while holding lock.private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE) {throw new OutOfMemoryError();}newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array) {newArray = new Object[newCap];}} finally {allocationSpinLock = 0;}}if (newArray == null) {// back off if another thread is allocatingThread.yield();}lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {while(k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = x;}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null) {notEmpty.await();}} finally {lock.unlock();}return result;}public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return size;} finally {lock.unlock();}}...
}

(4)延迟阻塞队列DelayQueue

DelayQueue是一个支持延迟获取元素的无界阻塞队列,它是基于优先级队列PriorityQueue实现的。

往DelayQueue队列插入元素时,可以按照自定义的delay时间进行排序。也就是队列中的元素顺序是按照到期时间排序的,只有delay时间小于或等于0的元素才能够被取出。

DelayQueue的应用场景有:

一.订单超时支付需要自动取消订单

二.任务超时处理需要自动丢弃任务

三.消息中间件的实现

//An unbounded BlockingQueue of Delayed elements, in which an element can only be taken when its delay has expired.
//The head of the queue is that Delayed element whose delay expired furthest in the past.
//If no delay has expired there is no head and poll will return null. 
//Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.
//Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. 
//For example, the size method returns the count of both expired and unexpired elements.
//This queue does not permit null elements.
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();//Thread designated to wait for the element at the head of the queue.//When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.//The leader thread must signal some other thread before returning from take() or poll(...), //unless some other thread becomes leader in the interim.  //Whenever the head of the queue is replaced with an element with an earlier expiration time, //the leader field is invalidated by being reset to null, and some waiting thread, //but not necessarily the current leader, is signalled. //So waiting threads must be prepared to acquire and lose leadership while waiting.private Thread leader = null;//Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.private final Condition available = lock.newCondition();//Creates a new {@code DelayQueue} that is initially empty.public DelayQueue() {}//Inserts the specified element into this delay queue. //As the queue is unbounded this method will never block.public void put(E e) {offer(e);}//Inserts the specified element into this delay queue.public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}//Retrieves and removes the head of this queue, //waiting if necessary until an element with an expired delay is available on this queue.public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await();} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {return q.poll();}  first = null; // don't retain ref while waitingif (leader != null) {available.await();} else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread) {leader = null;}}}}}} finally {if (leader == null && q.peek() != null) {available.signal();}lock.unlock();}}...
}public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable {//Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, if comparator is null: //For each node n in the heap and each descendant d of n, n <= d.  //The element with the lowest value is in queue[0], assuming the queue is nonempty.transient Object[] queue; //The number of elements in the priority queue.private int size = 0;//The comparator, or null if priority queue uses elements' natural ordering.private final Comparator<? super E> comparator;public E peek() {return (size == 0) ? null : (E) queue[0];}//Inserts the specified element into this priority queue.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}modCount++;int i = size;if (i >= queue.length) {grow(i + 1);}size = i + 1;if (i == 0) {queue[0] = e;} else {siftUp(i, e);}return true;}//Increases the capacity of the array.private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0) {newCapacity = hugeCapacity(minCapacity);}queue = Arrays.copyOf(queue, newCapacity);}private void siftUp(int k, E x) {if (comparator != null) {siftUpUsingComparator(k, x);} else {siftUpComparable(k, x);}}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (key.compareTo((E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = key;}@SuppressWarnings("unchecked")private void siftUpUsingComparator(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = x;}...
}

(5)无存储结构的阻塞队列SynchronousQueue

SynchronousQueue的内部没有容器来存储数据,因此当生产者往其添加一个元素而没有消费者去获取元素时,生产者会阻塞。当消费者往其获取一个元素而没有生产者去添加元素时,消费者也会阻塞。

SynchronousQueue的本质是借助了无容量存储的特点,来实现生产者线程和消费者线程的即时通信,所以它特别适合在两个线程之间及时传递数据。

线程池是基于阻塞队列来实现生产者/消费者模型的。当向线程池提交任务时,首先会把任务放入阻塞队列中,然后线程池中会有对应的工作线程专门处理阻塞队列中的任务。

Executors.newCachedThreadPool()就是基于SynchronousQueue来实现的,它会返回一个可以缓存的线程池。如果这个线程池大小超过处理当前任务所需的数量,会灵活回收空闲线程。当任务数量增加时,这个线程池会不断创建新的工作线程来处理这些任务。

public class Executors {...//Creates a thread pool that creates new threads as needed, //but will reuse previously constructed threads when they are available.//These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.//Calls to execute will reuse previously constructed threads if available. //If no existing thread is available, a new thread will be created and added to the pool. //Threads that have not been used for sixty seconds are terminated and removed from the cache. //Thus, a pool that remains idle for long enough will not consume any resources. //Note that pools with similar properties but different details (for example, timeout parameters)//may be created using ThreadPoolExecutor constructors.public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}...
}

(6)阻塞队列结合体LinkedTransferQueue

LinkedTransferQueue是一个基于链表实现的无界阻塞TransferQueue。

阻塞队列的特性是根据队列的数据情况来阻塞生产者线程或消费者线程,TransferQueue的特性是生产者线程生产数据后必须等消费者消费才返回。

LinkedTransferQueue是TransferQueue和LinkedBlockingQueue的结合体,而SynchronousQueue内部其实也是基于TransferQueue来实现的,所以LinkedTransferQueue是带有阻塞队列功能的SynchronousQueue。

(7)双向阻塞队列LinkedBlockingDeque

LinkedBlockingDeque是一个基于链表实现的双向阻塞队列,双向队列的两端都可以插入和移除元素,可减少多线程并发下的一半竞争。

图片

5.LinkedBlockingQueue的具体实现原理

(1)阻塞队列的设计分析

(2)有界队列LinkedBlockingQueue

(3)LinkedBlockingQueue的put()方法

(4)LinkedBlockingQueue的take()方法

(5)LinkedBlockingQueue使用两把锁拆分锁功能

(6)LinkedBlockingQueue的size()方法和迭代

(7)对比LinkedBlockingQueue链表队列和ArrayBlockingQueue数组队列

(1)阻塞队列的设计分析

阻塞队列的特性为:如果队列为空,消费者线程会被阻塞。如果队列满了,生产者线程会被阻塞。

为了实现这个特性:如何让线程在满足某个特定条件的情况下实现阻塞和唤醒?阻塞队列中的数据应该用什么样的容器来存储?

线程的阻塞和唤醒,可以使用wait/notify或者Condition。阻塞队列中数据的存储,可以使用数组或者链表。

(2)有界队列LinkedBlockingQueue

一.并发安全的无界队列

比如ConcurrentLinkedQueue,是没有边界没有大小限制的。它就是一个单向链表,可以无限制的往里面去存放数据。如果不停地往无界队列里添加数据,那么可能会导致内存溢出。

二.并发安全的有界队列

比如LinkedBlockingQueue,是有边界的有大小限制的。它也是一个单向链表,如果超过了限制,往队列里添加数据就会被阻塞。因此可以限制内存队列的大小,避免内存队列无限增长,最后撑爆内存。

(3)LinkedBlockingQueue的put()方法

put()方法是阻塞添加元素的方法,当队列满时,阻塞添加元素的线程。

首先把添加的元素封装成一个Node对象,该对象表示链表中的一个结点。

然后使用ReentrantLock.lockInterruptibly()方法来获取一个可被中断的锁,加锁的目的是保证数据添加到队列过程中的安全性 + 避免队列长度超阈值。

接着调用enqueue()方法把封装的Node对象存储到链表尾部,然后通过AtomicInteger来递增当前阻塞队列中的元素个数。

最后根据AtomicInteger类型的变量判断队列元素是否已超阈值。

注意:这里用到了一个很重要的属性notFull。notFull是一个Condition对象,用来阻塞和唤醒生产者线程。如果队列元素个数等于最大容量,就调用notFull.await()阻塞生产者线程。如果队列元素个数小于最大容量,则调用notFull.signal()唤醒生产者线程。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞队列的最大容量,默认是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁//使用两个Condition,分别阻塞和唤醒出队时的线程和入队时的线程private final Condition notEmpty = takeLock.newCondition();//出队的等待队列conditionprivate final Condition notFull = putLock.newCondition();//入队的等待队列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;//将添加的元素封装成一个Node对象Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//当前队列元素的数量putLock.lockInterruptibly();//加可被中断的锁try {//注意:这里用到了一个很重要的属性notFull,它是一个Condition对象,用来阻塞和唤醒生产者线程//如果阻塞队列当前的元素个数等于最大容量,就调用notFull.await()方法来阻塞生产者线程while (count.get() == capacity) {notFull.await();//阻塞当前线程,并释放锁}//把封装的Node对象存储到链表中enqueue(node);//通过AtomicInteger来递增当前阻塞队列中的元素个数,用于后续判断是否已超阻塞队列的最大容量c = count.getAndIncrement();//根据AtomicInteger类型的变量判断队列元素是否已超阈值if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();//释放锁}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {//node先成为当前last的next//然后last又指向last的next(即node)last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}...
}

(4)LinkedBlockingQueue的take()方法

take()方法是阻塞获取元素的方法,当队列为空时,阻塞获取元素的线程。

首先使用ReentrantLock.lockInterruptibly()方法来获取一个可被中断的锁。

然后判断元素个数是否为0,若是则通过notEmpty.await()阻塞消费者线程。

否则接着调用dequeue()方法从链表头部获取一个元素,并通过AtomicInteger来递减当前阻塞队列中的元素个数。

最后判断阻塞队列中的元素个数是否大于1,如果是,则调用notEmpty.signal()唤醒被阻塞的消费者线程。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞队列的最大容量,默认是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁//使用两个Condition,分别阻塞和唤醒出队时的线程和入队时的线程private final Condition notEmpty = takeLock.newCondition();//出队的等待队列conditionprivate final Condition notFull = putLock.newCondition();//入队的等待队列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//获取一个可中断的锁takeLock.lockInterruptibly();try {//判断元素个数是否为0while (count.get() == 0) {notEmpty.await();//阻塞当前线程并释放锁}//调用dequeue()方法从链表中获取一个元素x = dequeue();//通过AtomicInteger来递减当前阻塞队列中的元素个数c = count.getAndDecrement();//判断阻塞队列中的元素个数是否大于1//如果是,则调用notEmpty.signal()唤醒被阻塞的消费者消除if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}//首先获取链表的头结点head//然后拿到头结点的下一个结点first//然后把原来的头结点从队列中移除,设置first结点的数据为null,并将first结点设置为新的头结点//最后返回first结点的数据private E dequeue() {Node<E> h = head;//h指向headNode<E> first = h.next;//first指向h的nexth.next = h;// help GChead = first;E x = first.item;first.item = null;return x;}...
}

(5)LinkedBlockingQueue使用两把锁拆分锁功能

两把独占锁可以提升并发性能,因为出队和入队用的是不同的锁。这样在并发出队和入队的时候,出队和入队就可以同时执行,不会锁冲突。

这也是锁优化的一种思想,通过将一把锁按不同的功能进行拆分,使用不同的锁控制不同功能下的并发冲突,从而提升性能。

(6)LinkedBlockingQueue的size()方法和迭代

一.size()方法获取的结果也不是100%准确

LinkedBlockingQueue的size()方法获取元素个数是通过AtomicInteger获取的。

相比ConcurrentLinkedQueue通过遍历队列获取,准确性大很多。

相比CopyOnWriteArrayList通过遍历老副本数组获取,准确性也大很多。

但是相比ConcurrentHashMap通过分段CAS统计,那么准确性则差不多。

注意:LinkedBlockingQueue也不能获取到100%准确的队列元素的个数。除非锁掉整个队列,调用size()时不允许入队和出队,才会是100%准确。因为是完成入队或出队之后,才会对AtomicInteger变量进行递增或递减。

二.迭代时获取两把锁来锁整个队列

LinkedBlockingQueue的遍历会直接锁整个队列,即会先获取两把锁。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁public int size() {return count.get();}public Iterator<E> iterator() {return new Itr();}private class Itr implements Iterator<E> {private Node<E> current;private Node<E> lastRet;private E currentElement;Itr() {fullyLock();try {current = head.next;if (current != null) {currentElement = current.item;}} finally {fullyUnlock();}}...}void fullyLock() {putLock.lock();takeLock.lock();}void fullyUnlock() {takeLock.unlock();putLock.unlock();}...
}

(7)对比LinkedBlockingQueue链表队列和ArrayBlockingQueue数组队列

一.LinkedBlockingQueue是基于链表实现的有界阻塞队列,ArrayBlockingQueue是基于数组实现的有界阻塞队列

二.ArrayBlockingQueue的整体实现原理与LinkedBlockingQueue的整体实现原理是一样的

三.LinkedBlockingQueue需要使用两把独占锁,分别锁出队和入队的场景

四.ArrayBlockingQueue只使用一把锁,锁整个数组,所以其入队和出队不能同时进行

五.ArrayBlockingQueue执行size()方法获取元素个数时会直接加独占锁

六.ArrayBlockingQueue和LinkedBlockingQueue执行迭代方法时都会锁数据

6.基于两个队列实现的集群同步机制

(1)服务注册中心集群需要实现的功能

(2)基于两个队列实现的集群同步机制

(3)使用ConcurrentLinkedQueue实现第一个队列

(4)使用LinkedBlockingQueue实现第二个队列

(5)集群同步机制的具体实现

(1)服务注册中心集群需要实现的功能

服务实例向任何一个服务注册中心实例发送注册、下线、心跳的请求,该服务注册中心实例都需要将这些信息同步到其他的服务注册中心实例上,从而确保所有服务注册中心实例的内存注册表的数据是一致的。

(2)基于两个队列实现的集群同步机制

某服务注册中心实例接收到服务实例A的请求时,首先会把服务实例A的服务请求信息存储到本地的内存注册表里,也就是把服务实例A的服务请求信息写到第一个内存队列中,之后该服务注册中心实例对服务实例A的请求处理就可以结束并返回。

接着该服务注册中心实例会有一个后台线程消费第一个内存队列里的数据,把消费到的第一个内存队列的数据batch打包然后写到第二个内存队列里。

最后该服务注册中心实例还有一个后台线程消费第二个内存队列里的数据,把消费到的第二个内存队列的数据同步到其他服务注册中心实例中。

(3)使用ConcurrentLinkedQueue实现第一个队列

首先有两种队列:

一是无界队列ConcurrentLinkedQueue,基于CAS实现,并发性能很高。

二是有界队列LinkedBlockingQueue,基于两把锁实现,并发性能一般。

LinkedBlockingQueue默认的队列长度是MAX_VALUE,所以可以看成是无界队列。但是也可以指定正常大小的队列长度,从而实现入队的阻塞,避免耗尽内存。

当服务注册中心实例接收到各种请求时,会先将请求信息放入第一个队列。所以第一个队列会存在高并发写的情况,因此LinkedBlockingQueue不合适。

因为LinkedBlockingQueue属于阻塞队列,如果LinkedBlockingQueue满了,那么服务注册中心实例中的,处理服务请求的线程,就会被阻塞住。而且LinkedBlockingQueue的并发性能也不是太高,要获取独占锁才能写,所以最好还是使用无界队列ConcurrentLinkedQueue来实现第一个队列。

(4)使用LinkedBlockingQueue实现第二个队列

消费第一个内存队列的数据时,可以按时间来进行batch打包,比如每隔500ms才将消费到的所有数据打包成一个batch消息。接着再将这个batch信息放入到第二个内存队列中,这样消费第二个队列的数据时,只需同步batch信息到集群其他实例即可。

可见对第二个队列进行的入队和出队操作是由少数的后台线程来执行的,因此可以使用有界队列LinkedBlockingQueue来实现第二个内存队列。

此外还要估算有界队列LinkedBlockingQueue的队列长度应设置多少才合适。假如每一条需要同步给集群其他实例的请求信息,有6个字段,占30字节。平均每一条batch信息会包含100条请求信息,也就是会占3000字节 = 3KB。那么1000条batch消息,才占用3000KB = 3MB。因此可以设置第二个内存队列LinkedBlockingQueue的长度为1000。

(5)集群同步机制的具体实现

//集群同步组件
public class PeersReplicator {//集群同步生成batch的间隔时间:500msprivate static final long PEERS_REPLICATE_BATCH_INTERVAL = 500;private static final PeersReplicator instance = new PeersReplicator();private PeersReplicator() {//启动接收请求和打包batch的线程AcceptorBatchThread acceptorBatchThread = new AcceptorBatchThread();acceptorBatchThread.setDaemon(true); acceptorBatchThread.start();//启动同步发送batch的线程PeersReplicateThread peersReplicateThread = new PeersReplicateThread();peersReplicateThread.setDaemon(true);peersReplicateThread.start(); }public static PeersReplicator getInstance() {return instance;}//第一个内存队列:处理高并发的服务请求,所以存在高并发的写入情况,无界队列private ConcurrentLinkedQueue<AbstractRequest> acceptorQueue = new ConcurrentLinkedQueue<AbstractRequest>();//第二个内存队列:有界队列,用于同步batch消息到其他集群实例private LinkedBlockingQueue<PeersReplicateBatch> replicateQueue = new LinkedBlockingQueue<PeersReplicateBatch>(10000);  //同步服务注册请求public void replicateRegister(RegisterRequest request) {request.setType(AbstractRequest.REGISTER_REQUEST); //将请求消息放入第一个内存队列acceptorQueue.offer(request);}//同步服务下线请求public void replicateCancel(CancelRequest request) {request.setType(AbstractRequest.CANCEL_REQUEST);//将请求消息放入第一个内存队列acceptorQueue.offer(request);}//同步发送心跳请求public void replicateHeartbeat(HeartbeatRequest request) {request.setType(AbstractRequest.HEARTBEAT_REQUEST);//将请求消息放入第一个内存队列acceptorQueue.offer(request);}//负责接收数据以及打包为batch的后台线程class AcceptorBatchThread extends Thread {long latestBatchGeneration = System.currentTimeMillis();@Overridepublic void run() {while(true) {try {//每隔500ms生成一个batchPeersReplicateBatch batch = new PeersReplicateBatch();long now = System.currentTimeMillis();if (now - latestBatchGeneration >= PEERS_REPLICATE_BATCH_INTERVAL) {//已经到了500ms的时间间隔//将batch消息放入第二个内存队列replicateQueue.offer(batch);//更新latestBatchGenerationlatestBatchGeneration = System.currentTimeMillis();//重置batchbatch = new PeersReplicateBatch();} else {//还没到500ms的时间间隔//从第一层队列获取数据,然后batch放入到第二层队列中AbstractRequest request = acceptorQueue.poll();if (request != null) {batch.add(request);  } else {Thread.sleep(100);}            }} catch (Exception e) {e.printStackTrace(); }}}}//集群同步线程class PeersReplicateThread extends Thread {@Overridepublic void run() {while(true) {try {PeersReplicateBatch batch = replicateQueue.take();if (batch != null) {//遍历其他的register-server地址//给每个地址的register-server都发送一个http请求同步batchSystem.out.println("给其他的register-server发送请求,同步batch......");      }} catch (Exception e) {e.printStackTrace(); }}}}
}//用于进行批量同步的batch消息
public class PeersReplicateBatch {private List<AbstractRequest> requests = new ArrayList<AbstractRequest>();public void add(AbstractRequest request) {this.requests.add(request);}public List<AbstractRequest> getRequests() {return requests;}public void setRequests(List<AbstractRequest> requests) {this.requests = requests;}
}//负责接收和处理register-client发送过来的请求的
public class RegisterServerController {//服务注册表private ServiceRegistry registry = ServiceRegistry.getInstance();//服务注册表的缓存private ServiceRegistryCache registryCache = ServiceRegistryCache.getInstance();//集群同步组件private PeersReplicator peersReplicator = PeersReplicator.getInstance();//服务注册public RegisterResponse register(RegisterRequest registerRequest) {RegisterResponse registerResponse = new RegisterResponse();try {//在注册表中加入这个服务实例ServiceInstance serviceInstance = new ServiceInstance();serviceInstance.setHostname(registerRequest.getHostname()); serviceInstance.setIp(registerRequest.getIp()); serviceInstance.setPort(registerRequest.getPort()); serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId()); serviceInstance.setServiceName(registerRequest.getServiceName());registry.register(serviceInstance);//更新自我保护机制的阈值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//过期掉注册表缓存registryCache.invalidate();//进行集群同步peersReplicator.replicateRegister(registerRequest);registerResponse.setStatus(RegisterResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); registerResponse.setStatus(RegisterResponse.FAILURE);}return registerResponse;}//服务下线  public void cancel(CancelRequest cancelRequest) {//从服务注册中摘除实例registry.remove(cancelRequest.getServiceName(), cancelRequest.getServiceInstanceId());//更新自我保护机制的阈值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//过期掉注册表缓存registryCache.invalidate();//进行集群同步peersReplicator.replicateCancel(cancelRequest);  }//发送心跳public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { HeartbeatResponse heartbeatResponse = new HeartbeatResponse();try {//获取服务实例ServiceInstance serviceInstance = registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId());if (serviceInstance != null) {serviceInstance.renew();}//记录一下每分钟的心跳的次数HeartbeatCounter heartbeatMessuredRate = HeartbeatCounter.getInstance();heartbeatMessuredRate.increment();//进行集群同步peersReplicator.replicateHeartbeat(heartbeatRequest);heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); heartbeatResponse.setStatus(HeartbeatResponse.FAILURE); }return heartbeatResponse;}//同步batch数据public void replicateBatch(PeersReplicateBatch batch) {for (AbstractRequest request : batch.getRequests()) {if (request.getType().equals(AbstractRequest.REGISTER_REQUEST)) {register((RegisterRequest) request);} else if (request.getType().equals(AbstractRequest.CANCEL_REQUEST)) {cancel((CancelRequest) request);} else if (request.getType().equals(AbstractRequest.HEARTBEAT_REQUEST)) {heartbeat((HeartbeatRequest) request); }}}//拉取全量注册表public Applications fetchFullRegistry() {return (Applications) registryCache.get(CacheKey.FULL_SERVICE_REGISTRY);}//拉取增量注册表public DeltaRegistry fetchDeltaRegistry() {return (DeltaRegistry) registryCache.get(CacheKey.DELTA_SERVICE_REGISTRY); }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/896019.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue项目启动时报错:error:0308010C:digital envelope routines::unsupported

此错误与 Node.js 的加密模块有关&#xff0c;特别是在使用 OpenSSL 3.0 及以上版本时。Vue 项目在启动时可能会依赖一些旧的加密算法&#xff0c;而这些算法在 OpenSSL 3.0 中默认被禁用&#xff0c;导致 error:0308010C:digital envelope routines::unsupported 错误。 解决…

ncDLRES:一种基于动态LSTM和ResNet的非编码RNA家族预测新方法

现有的计算方法主要分为两类&#xff1a;第一类是通过学习序列或二级结构的特征来预测ncRNAs家族&#xff0c;另一类是通过同源序列之间的比对来预测ncRNAs家族。在第一类中&#xff0c;一些方法通过学习预测的二级结构特征来预测ncRNAs家族。二级结构预测的不准确性可能会导致…

爱普生 SG-8101CE 可编程晶振在笔记本电脑的应用

在笔记本电脑的精密架构中&#xff0c;每一个微小的元件都如同精密仪器中的齿轮&#xff0c;虽小却对整体性能起着关键作用。如今的笔记本电脑早已不再局限于简单的办公用途&#xff0c;其功能愈发丰富多样。从日常轻松的文字处理、网页浏览&#xff0c;到专业领域中对图形处理…

SPRING10_getBean源码详细解读、流程图

文章目录 ①. getBean方法的入口-DefaultListableBeanFactory②. DefaultListableBeanFactory调用getBean③. 进入到doGetBean方法④. getSingleton三级缓存方法⑤. getSingleton()方法分析⑥. createBean创建对象方法⑦. 对象创建、属性赋值、初始化⑧. getBean最详细流程图 ①…

IDEA中查询Maven项目的依赖树

在Maven项目中&#xff0c;查看项目的依赖树是一个常见的需求&#xff0c;特别是当你需要了解项目中直接或间接依赖了哪些库及其版本时。你可以通过命令行使用Maven的dependency:tree插件来做到这一点。这个命令会列出项目中所有依赖的树状结构。 打开idea项目的终端&#xff…

windows 安装 stable diffusion

在windows上安装 stable diffusion&#xff0c;如果windows没有nvidia显卡&#xff0c;想只使用CPU可在webui-user.bat中添加命令 set COMMANDLINE_ARGS--no-half --skip-torch-cuda-test 可正常使用stable diffusion&#xff0c;但速度较慢

DeepSeek 助力 Vue 开发:打造丝滑的缩略图列表(Thumbnail List)

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 Deep…

DeepSeek写俄罗斯方块手机小游戏

DeepSeek写俄罗斯方块手机小游戏 提问 根据提的要求&#xff0c;让DeepSeek整理的需求&#xff0c;进行提问&#xff0c;内容如下&#xff1a; 请生成一个包含以下功能的可运行移动端俄罗斯方块H5文件&#xff1a; 核心功能要求 原生JavaScript实现&#xff0c;适配手机屏幕 …

百问网(100ask)的IMX6ULL开发板的以太网控制器(MAC)与物理层(PHY)芯片(LAN8720A)连接的原理图分析(包含各引脚说明以及工作原理)

前言 本博文承接博文 https://blog.csdn.net/wenhao_ir/article/details/145663029 。 本博文和博文 https://blog.csdn.net/wenhao_ir/article/details/145663029 的目录是找出百问网(100ask)的IMX6ULL开发板与NXP官方提供的公板MCIMX6ULL-EVK(imx6ull14x14evk)在以太网硬件…

QT开发技术 【opencv图片裁剪,平均哈希相似度判断,以及获取游戏窗口图片】

一、图片裁剪 int CJSAutoWidget::GetHouseNo(cv::Mat matMap) {cv::imwrite(m_strPath "/Data/map.png", matMap);for (int i 0; i < 4; i){for (int j 0; j < 6; j){// 计算当前子区域的矩形cv::Rect roi(j * 20, i * 17, 20, 17);// 提取子区域cv::Mat …

TiDB 是一个分布式 NewSQL 数据库

TiDB 是一个分布式 NewSQL 数据库。它支持水平弹性扩展、ACID 事务、标准 SQL、MySQL 语法和 MySQL 协议&#xff0c;具有数据强一致的高可用特性&#xff0c;是一个不仅适合 OLTP 场景还适合 OLAP 场景的混合数据库。 TiDB是 PingCAP公司自主设计、研发的开源分布式关系型数据…

mysql 学习15 SQL优化,插入数据优化,主键优化,order by优化,group by 优化,limit 优化,count 优化,update 优化

插入数据优化&#xff0c; insert 优化&#xff0c; 批量插入&#xff08;一次不超过1000条&#xff09; 手动提交事务 主键顺序插入 load 从本地一次插入大批量数据&#xff0c; 登陆时 mysql --local-infile -u root -p load data local infile /root/sql1.log into table tb…

使用JWT实现微服务鉴权

目录 一、微服务鉴权 1、思路分析 2、系统微服务签发token 3、网关过滤器验证token 4、测试鉴权功能 前言&#xff1a; 随着微服务架构的广泛应用&#xff0c;服务间的鉴权与安全通信成为系统设计的核心挑战之一。传统的集中式会话管理在分布式场景下面临性能瓶颈和扩展性…

广西壮族自治区园区投促中心党委书记陶德文率团到访深兰科技

2月16日&#xff0c;广西壮族自治区园区投促中心党委书记、主任&#xff0c;自治区园区办党组成员陶德文率团来到深兰科技集团上海总部考察调研&#xff0c;并与深兰科技集团创始人、董事长陈海波等集团管理层座谈交流&#xff0c;双方围绕深兰科技人工智能项目落地广西的相关事…

基于UnrealEngine(UE5)的太空探索

视频部分可参见&#xff1a;https://www.bilibili.com/video/BV1JWA8eSEVg/ 中国 天宫号 空间站 人造卫星可视化 星链卫星可视化 小行星分布及运动轨迹可视化 月球基地 可视化 八大行星轨道 太阳系宜居带可视化 阿波罗8号拍摄的地球升起 谷神星模型及轨迹可视化 星座可视化 十…

WLAN无线2.4G/5G频段划分和可用信道

互联网各领域资料分享专区(不定期更新)&#xff1a; Sheet

使用 OpenTelemetry 和 Langtrace 的 Elastic 分发跟踪基于 RAG 的聊天机器人

作者&#xff1a;来自 Elastic Bahubali Shetti 如何使用 Elastic 观察基于 OpenAI RAG 的应用程序。使用 Langtrace 对应用程序进行检测&#xff0c;收集日志、跟踪、指标&#xff0c;并了解 LLM 在 Kubernetes 上使用 OpenTelemetry 的 Elastic Distributions 的运行情况。 目…

基于机器学习的水文数据采集预测与可视化分析系统

【机器学习】基于机器学习的水文数据采集预测与可视化分析系统&#xff08;完整系统源码开发笔记详细部署教程&#xff09;✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 系统采用Python及Flask框架构建Web服务端&#xff0c;结合PyMySQL与MySQL实现数据…

三甲医院网络架构与安全建设实战

一、设计目标 实现医疗业务网/卫生专网/互联网三网隔离 满足等保2.0三级合规要求 保障PACS影像系统低时延传输 实现医疗物联网统一接入管控 二、全网拓扑架构 三、网络分区与安全设计 IP/VLAN规划表 核心业务配置&#xff08;华为CE6865&#xff09; interface 100G…