同时开10个线程存入和取出100万的数据,结论如下:
DoubleBufferedQueue < ConcurrentLinkedQueue < ArrayBlockingQueue < LinkedBlockingQueue
执行结果如下:
100万 DoubleBufferedQueue入队时间:9510 出队时间:10771
100万 DoubleBufferedQueue入队时间:8169 出队时间:9789
1000万 DoubleBufferedQueue入队时间:98285 出队时间:101088
1000万 DoubleBufferedQueue入队时间:101859 出队时间:105964
100万 ConcurrentLinkedQueue入队时间:10557 出队时间:13716
100万 ConcurrentLinkedQueue入队时间:25298 出队时间:25332
1000万 ConcurrentLinkedQueue队列时间:121868 出队时间:136116
1000万 ConcurrentLinkedQueue队列时间:134306 出队时间:147893
100万 ArrayBlockingQueue入队时间:21080 出队时间:22025
100万 ArrayBlockingQueue入队时间:17689 出队时间:19654
1000万 ArrayBlockingQueue入队时间:194400 出队时间:205968
1000万 ArrayBlockingQueue入队时间:192268 出队时间:197982
100万 LinkedBlockingQueue入队时间:38236 出队时间:52555
100万 LinkedBlockingQueue入队时间:30646 出队时间:38573
1000万 LinkedBlockingQueue入队时间:375669 出队时间:391976
1000万 LinkedBlockingQueue入队时间:701363 出队时间:711217
doubleBufferedQueue:
package test.MoreThread.d;import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import test.MoreThread.l.linkedBlockingQueue; import comrt.util.DoubleBufferedQueue;//DoubleBufferedQueue入队时间:9510 出队时间:10771 //DoubleBufferedQueue入队时间:8169 出队时间:9789 public class doubleBufferedQueue {private static final Logger log = LoggerFactory.getLogger(doubleBufferedQueue.class);public final static int size1 = 1000000;public static DoubleBufferedQueue<Object> queue = new DoubleBufferedQueue<Object>(size1);public final static int threadNumber = 10;public static boolean isOver = false;public static void main(String[] args) throws InterruptedException,ExecutionException {// long timestart = System.currentTimeMillis();Thread thread1 = new Thread(new Runnable() {public void run() {ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService.submit(new ExecDoubleBufferedQueue());results.add(future);}long allTime = 0;for (Future<Long> fs : results) {try {allTime += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService.shutdown();}}doubleBufferedQueue.isOver = true;log.info("入队列总共执行时间:" + allTime);}});thread1.start();// log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart));// ------------------------------Thread thread2 = new Thread(new Runnable() {public void run() {ExecutorService executorService2 = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService2.submit(new ExecDoubleBufferedQueue_Out());results_out.add(future);}long allTime_out = 0;for (Future<Long> fs : results_out) {try {allTime_out += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService2.shutdown();}}log.info("出队列总共执行时间:" + allTime_out);}});thread2.start();} }class ExecDoubleBufferedQueue implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(doubleBufferedQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();for (int i = 0; i < doubleBufferedQueue.size1; i++) {doubleBufferedQueue.queue.offer(i);}long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2);return time2;} }class ExecDoubleBufferedQueue_Out implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(doubleBufferedQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();while (!doubleBufferedQueue.isOver) {doubleBufferedQueue.queue.poll();}long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2);return time2;} }
concurrentLinkedQueue:
package test.MoreThread.c;import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory;//ConcurrentLinkedQueue入队时间:10557 出队时间:13716 //ConcurrentLinkedQueue入队时间:25298 出队时间:25332 public class concurrentLinkedQueue {private static final Logger log = LoggerFactory.getLogger(concurrentLinkedQueue.class);public static ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();public final static int size1 = 1000000;public final static int threadNumber = 10;public static boolean isOver = false;public static void main(String[] args) throws InterruptedException,ExecutionException {// long timestart = System.currentTimeMillis();Thread thread1 = new Thread(new Runnable() {public void run() {ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService.submit(new Exec());results.add(future);}long allTime = 0;for (Future<Long> fs : results) {try {allTime += fs.get(); // log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService.shutdown();}}concurrentLinkedQueue.isOver = true;log.info("队列总共执行时间:" + allTime);}});thread1.start();// ------------------------------Thread thread2 = new Thread(new Runnable() {public void run() {ExecutorService executorService2 = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService2.submit(new Exec_Out());results_out.add(future);}long allTime_out = 0;for (Future<Long> fs : results_out) {try {allTime_out += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService2.shutdown();}}log.info("出队列总共执行时间:" + allTime_out);}});thread2.start();// log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); } }class Exec implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(concurrentLinkedQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();for (int i = 0; i < concurrentLinkedQueue.size1; i++) {concurrentLinkedQueue.queue.offer(i);}long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2);return time2;} }class Exec_Out implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(concurrentLinkedQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();while (!concurrentLinkedQueue.isOver) {concurrentLinkedQueue.queue.poll();}long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2);return time2;} }
arrayBlockingQueue:
package test.MoreThread.a;import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory;//ArrayBlockingQueue入队时间:21080 出队时间:22025 //ArrayBlockingQueue入队时间:17689 出队时间:19654 public class arrayBlockingQueue {private static final Logger log = LoggerFactory.getLogger(arrayBlockingQueue.class);public final static int size1 = 1000000;public static ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(size1);public final static int threadNumber = 10;public static boolean isOver = false;public static void main(String[] args) throws InterruptedException,ExecutionException {// long timestart = System.currentTimeMillis();Thread thread1 = new Thread(new Runnable() {public void run() {ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService.submit(new ExecArrayBlockingQueue());results.add(future);}long allTime = 0;for (Future<Long> fs : results) {try {allTime += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService.shutdown();}}arrayBlockingQueue.isOver = true;log.info("队列总共执行时间:" + allTime);}});thread1.start();// log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart));// ------------------------------Thread thread2 = new Thread(new Runnable() {public void run() {ExecutorService executorService2 = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService2.submit(new ExecArrayBlockingQueue_Out());results_out.add(future);}long allTime_out = 0;for (Future<Long> fs : results_out) {try {allTime_out += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService2.shutdown();}}log.info("出队列总共执行时间:" + allTime_out);}});thread2.start();} }class ExecArrayBlockingQueue implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(arrayBlockingQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();for (int i = 0; i < arrayBlockingQueue.size1; i++) {arrayBlockingQueue.queue.offer(i);}long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2);return time2;} }class ExecArrayBlockingQueue_Out implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(arrayBlockingQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();while (!arrayBlockingQueue.isOver) {arrayBlockingQueue.queue.poll();}long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2);return time2;} }
linkedBlockingQueue:
package test.MoreThread.l;import java.util.ArrayList; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory;//LinkedBlockingQueue入队时间:38236 出队时间:52555 //LinkedBlockingQueue入队时间:30646 出队时间:38573 public class linkedBlockingQueue {private static final Logger log = LoggerFactory.getLogger(linkedBlockingQueue.class);public final static int size1 = 1000000;public static LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(size1);public final static int threadNumber = 10;public static boolean isOver = false;public static void main(String[] args) throws InterruptedException,ExecutionException {long timestart = System.currentTimeMillis();Thread thread1 = new Thread(new Runnable() {public void run() {ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService.submit(new ExecLinkedBlockingQueue());results.add(future);}long allTime = 0;for (Future<Long> fs : results) {try {allTime += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService.shutdown();}}linkedBlockingQueue.isOver = true;log.info("入队列总共执行时间:" + allTime);}});thread1.start();// log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); // System.out.println(linkedBlockingQueue.queue.size());// ------------------------------ Thread thread2 = new Thread(new Runnable() {public void run() {ExecutorService executorService2 = Executors.newFixedThreadPool(threadNumber);ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();for (int i = 0; i < threadNumber; i++) {Future<Long> future = executorService2.submit(new ExecLinkedBlockingQueue_Out());results_out.add(future);}long allTime_out = 0;for (Future<Long> fs : results_out) {try {allTime_out += fs.get();// log.info("" + fs.get());} catch (InterruptedException e) {log.info("" + e);return;} catch (ExecutionException e) {log.info("" + e);} finally {executorService2.shutdown();}}log.info("出队列总共执行时间:" + allTime_out);}});thread2.start();} }class ExecLinkedBlockingQueue implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(linkedBlockingQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();for (int i = 0; i < linkedBlockingQueue.size1; i++) {linkedBlockingQueue.queue.offer(i);}long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2);return time2;} }class ExecLinkedBlockingQueue_Out implements Callable<Long> {private static final Logger log = LoggerFactory.getLogger(linkedBlockingQueue.class);@Overridepublic Long call() throws Exception {long time = System.currentTimeMillis();while (!linkedBlockingQueue.isOver) {linkedBlockingQueue.queue.poll();}long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2);return time2;} }
DoubleBufferedQueue双缓冲队列
package comrt.util;import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;import org.slf4j.Logger; import org.slf4j.LoggerFactory;//双缓冲队列,线程安全 public class DoubleBufferedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = 1011398447523020L;public static final int DEFAULT_QUEUE_CAPACITY = 5000000;public static final long DEFAULT_MAX_TIMEOUT = 0;public static final long DEFAULT_MAX_COUNT = 10;private Logger logger = LoggerFactory.getLogger(DoubleBufferedQueue.class.getName());/** The queued items */private ReentrantLock readLock;// 写锁private ReentrantLock writeLock;// 是否满private Condition notFull;private Condition awake;// 读写数组private transient E[] writeArray;private transient E[] readArray;// 读写计数private volatile int writeCount;private volatile int readCount;// 写数组下标指针private int writeArrayTP;private int writeArrayHP;// 读数组下标指针private int readArrayTP;private int readArrayHP;private int capacity;public DoubleBufferedQueue(int capacity) {// 默认this.capacity = DEFAULT_QUEUE_CAPACITY;if (capacity > 0) {this.capacity = capacity;}readArray = (E[]) new Object[capacity];writeArray = (E[]) new Object[capacity];readLock = new ReentrantLock();writeLock = new ReentrantLock();notFull = writeLock.newCondition();awake = writeLock.newCondition();}private void insert(E e) {writeArray[writeArrayTP] = e;++writeArrayTP;++writeCount;}private E extract() {E e = readArray[readArrayHP];readArray[readArrayHP] = null;++readArrayHP;--readCount;return e;}/*** switch condition: read queue is empty && write queue is not empty* * Notice:This function can only be invoked after readLock is grabbed,or may* cause dead lock* * @param timeout* @param isInfinite* : whether need to wait forever until some other thread awake* it* @return* @throws InterruptedException*/private long queueSwap(long timeout, boolean isInfinite) throws InterruptedException {writeLock.lock();try {if (writeCount <= 0) {// logger.debug("Write Count:" + writeCount// + ", Write Queue is empty, do not switch!");try {// logger.debug("Queue is empty, need wait....");if (isInfinite && timeout <= 0) {awake.await();return -1;} else if (timeout > 0) {return awake.awaitNanos(timeout);} else {return 0;}} catch (InterruptedException ie) {awake.signal();throw ie;}} else {E[] tmpArray = readArray;readArray = writeArray;writeArray = tmpArray;readCount = writeCount;readArrayHP = 0;readArrayTP = writeArrayTP;writeCount = 0;writeArrayHP = readArrayHP;writeArrayTP = 0;notFull.signal();// logger.debug("Queue switch successfully!");return 0;}} finally {writeLock.unlock();}}@Overridepublic boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {if (e == null) {throw new NullPointerException();}long nanoTime = 0;if (timeout > 0) {nanoTime = unit.toNanos(timeout);}writeLock.lockInterruptibly();try {for (int i = 0; i < DEFAULT_MAX_COUNT; i++) {if (writeCount < writeArray.length) {insert(e);if (writeCount == 1) {awake.signal();}return true;}// Time outif (nanoTime <= 0) {// logger.debug("offer wait time out!");return false;}// keep waitingtry {// logger.debug("Queue is full, need wait....");nanoTime = notFull.awaitNanos(nanoTime);} catch (InterruptedException ie) {notFull.signal();throw ie;}}} finally {writeLock.unlock();}return false;}// 取 @Overridepublic E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanoTime = 0;if (timeout > 0) {nanoTime = unit.toNanos(timeout);}readLock.lockInterruptibly();try {if (nanoTime > 0) {for (int i = 0; i < DEFAULT_MAX_COUNT; i++) {if (readCount > 0) {return extract();}if (nanoTime <= 0) {// logger.debug("poll time out!");return null;}nanoTime = queueSwap(nanoTime, false);}} else {if (readCount > 0) {return extract();}queueSwap(nanoTime, false);if (readCount > 0) {return extract();} }} finally {readLock.unlock();}return null;}// 等待500毫秒 @Overridepublic E poll() {E ret = null;try {ret = poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);} catch (Exception e) {ret = null;}return ret;}// 查看 @Overridepublic E peek() {E e = null;readLock.lock();try {if (readCount > 0) {e = readArray[readArrayHP];}} finally {readLock.unlock();}return e;}// 默认500毫秒 @Overridepublic boolean offer(E e) {boolean ret = false;try {ret = offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);} catch (Exception e2) {ret = false;}return ret;}@Overridepublic void put(E e) throws InterruptedException {// never need to // block offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); }@Overridepublic E take() throws InterruptedException {return poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);}@Overridepublic int remainingCapacity() {return this.capacity;}@Overridepublic int drainTo(Collection<? super E> c) {return 0;}@Overridepublic int drainTo(Collection<? super E> c, int maxElements) {return 0;}@Overridepublic Iterator<E> iterator() {return null;}// 当前读队列中还有多少个 @Overridepublic int size() {int size = 0;readLock.lock();try {size = readCount;} finally {readLock.unlock();}return size;}/*** 当前已写入的队列大小* */public int WriteSize() {int size = 0;writeLock.lock();try {size = writeCount;} finally {writeLock.unlock();}return size;}public int unsafeReadSize() {return readCount;}public int unsafeWriteSize() {return writeCount;}public int capacity() {return capacity;}public String toMemString() {return "--read: " + readCount + "/" + capacity + "--write: " + writeCount + "/" + capacity;}// 清理/** public void clear() { readLock.lock(); writeLock.lock(); try { readCount* = 0; readArrayHP = 0; writeCount = 0; writeArrayTP = 0;* //logger.debug("Queue clear successfully!"); } finally {* writeLock.unlock(); readLock.unlock(); } }*/ }