java并发队列
在内部,这些实现巧妙地利用了Java 1.5中引入的另一种并发功能-阻塞队列。
队列
首先,简要回顾一下什么是标准队列。 在计算机科学中,队列只是一个集合,始终将元素添加到末尾,并且始终从头开始获取元素。 表达式先进先出(FIFO)通常用于描述标准队列。 Java 1.6中引入的是Deque或双端队列,并且此接口现在在LinkedList上实现。 Java中的某些队列允许其他排序,例如使用Comparator甚至编写自己的排序实现。 尽管扩展功能很好,但是我们今天关注的是BlockingQueues如何真正在并发开发中大放异彩。
阻塞队列
阻塞队列是一些队列,它们还公开了在没有可用元素的情况下阻止检索元素的请求的功能,该附加选项可以限制等待时间。 在受限制的大小队列上,尝试添加时可以使用相同的阻止功能。 让我们深入探讨一下BlockingQueue用法的示例。
让我们假设一个简单的场景。 您有一个处理线程,其功能只是执行命令。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;private BlockingQueue<Command> workQueue = new LinkedBlockingQueue<Command>();public void addCommand(Command command) {workQueue.offer(command);
}public Object call() throws Exception {try {Command command = workQueue.take();command.execute();} catch (InterruptedException e) {throw new WorkException(e);}
}
当然,这是一个非常简单的示例,但它向您展示了对多个线程使用BlockingQueue的基本知识。 让我们尝试一些更多的事情。 在此示例中,我们需要创建一个具有限制的连接池。 它仅应根据需要创建连接。 没有客户端等待超过5秒的可用连接。
private BlockingQueue<Connection> pool = new ArrayBlockingQueue<Connection>(10);
private AtomicInteger connCount = new AtomicInteger();public Connection getConnection() {Connection conn = pool.poll(5, TimeUnit.SECONDS);if (conn == null) {synchronized (connCount) {if (connCount.get() < 10) {conn = getNewConnection();pool.offer(conn);connCount.incrementAndGet();}}if (conn == null) {throw new ConnUnavailException();} else {return conn;}}
}
最后,让我们考虑一个有趣的实现示例示例SynchronousQueue 。
在此示例中,类似于我们的第一个示例,我们想要执行一个Command,但是需要知道它何时完成,最多等待2分钟。
private BlockingQueue workQueue = new LinkedBlockingQueue();
private Map commandQueueMap = new ConcurrentHashMap(); public SynchronousQueue addCommand(Command command) {SynchronousQueue queue = new SynchronousQueue();commandQueueMap.put(command, queue);workQueue.offer(command);return queue;
}public Object call() throws Exception {try {Command command = workQueue.take();Result result = command.execute();SynchronousQueue queue = commandQueueMap.get(command);queue.offer(result);return null;} catch (InterruptedException e) {throw new WorkException(e);}
}
现在,使用者可以根据请求安全地轮询超时,以执行其命令。
Command command;
SynchronousQueue queue = commandRunner.addCommand(command);
Result result = queue.poll(2, TimeUnit.MINUTES);
if (result == null) {throw new CommandTooLongException(command);
} else {return result;
}
正如您开始看到的那样,java中的BlockingQueues提供了很大的灵活性,并为您提供了相对简单的结构来满足多线程应用程序中的许多(如果不是全部)需求。 我们甚至没有审查过一些非常整洁的BlockingQueues ,例如PriorityBlockingQueue和DelayQueue 。 看看他们并取得联系。 我们喜欢与开发人员交谈。
参考: Carfey Software博客上的JCG合作伙伴的Java并发第5部分-阻塞队列 。
- Java并发教程–信号量
- Java并发教程–重入锁
- Java并发教程–线程池
- Java并发教程–可调用,将来
- Java并发教程– CountDownLatch
- Exchanger和无GC的Java
- Java Fork / Join进行并行编程
- 使用迭代器时如何避免ConcurrentModificationException
- 改善Java应用程序性能的快速技巧
翻译自: https://www.javacodegeeks.com/2011/09/java-concurrency-tutorial-blocking.html
java并发队列