用多线程来简易的实现一个消息队列的生产者消费者模式:10个线程生成数字消息投递到消息队列,10个线程做数字消息的消费者,消费生产者投递的消息。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class ProducerConsumerDemo {private static final int QUEUE_CAPACITY = 50;private static final int PRODUCER_COUNT = 10;private static final int CONSUMER_COUNT = 10;public static void main(String[] args) {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);// 创建生产者线程for (int i = 0; i < PRODUCER_COUNT; i++) {new Thread(new Producer(queue), "Producer-" + i).start();}// 创建消费者线程for (int i = 0; i < CONSUMER_COUNT; i++) {new Thread(new Consumer(queue), "Consumer-" + i).start();}}
}class Producer implements Runnable {private final BlockingQueue<Integer> queue;private static int count = 0;public Producer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {int number = produce();queue.put(number);System.out.println(Thread.currentThread().getName() + " produced: " + number);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private int produce() {return count++;}
}class Consumer implements Runnable {private final BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {int number = queue.take();consume(number);System.out.println(Thread.currentThread().getName() + " consumed: " + number);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void consume(int number) {// 消费数字的逻辑}
}
BlockingQueue
是 Java 中一个用于线程间通信的接口,它具有以下几个重要特性:
-
阻塞操作:
BlockingQueue
提供了阻塞的put
和take
操作,这意味着如果队列满了,调用put
方法的线程会被阻塞,直到队列有空闲位置;如果队列空了,调用take
方法的线程会被阻塞,直到队列中有新元素被加入。- 这使得
BlockingQueue
特别适用于生产者-消费者模式。
-
线程安全:
BlockingQueue
本质上是线程安全的,所有的插入、移除和检查操作都是原子的,并且使用了内部锁和其他同步机制来保证线程安全。
-
不同的实现:
BlockingQueue
接口有多种实现,每种实现具有不同的特性和用途:ArrayBlockingQueue
:一个有界的阻塞队列,内部实现是数组。LinkedBlockingQueue
:一个可选有界的阻塞队列,内部实现是链表。PriorityBlockingQueue
:一个无界的阻塞优先队列,内部元素按优先级排序。DelayQueue
:一个无界的阻塞队列,其中元素只有在其延迟期满时才能被取走。SynchronousQueue
:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作,反之亦然。LinkedTransferQueue
:一个无界的TransferQueue
,它在没有消费者时不会积压生产者的数据。
-
容量控制:
BlockingQueue
可以有界,也可以无界。有界队列在创建时可以指定容量,控制队列中的元素数量,以避免过多的内存消耗。put
和offer
操作可以被用来添加元素到队列,put
操作在队列满时会阻塞,而offer
操作则会在队列满时返回false
。
-
无等待队列操作:
BlockingQueue
还提供了一些无等待的队列操作,例如poll
和offer
,它们不会阻塞当前线程,而是在特定条件不满足时立即返回。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class BlockingQueueExample {public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // 有界队列,容量为5// 生产者线程Thread producer = new Thread(() -> {try {for (int i = 0; i < 10; i++) {queue.put(i); // 如果队列满了,会阻塞System.out.println("Produced: " + i);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 消费者线程Thread consumer = new Thread(() -> {try {while (true) {Integer item = queue.take(); // 如果队列空了,会阻塞System.out.println("Consumed: " + item);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});producer.start();consumer.start();producer.join();consumer.join();}
}