阻塞队列
kafka是目前来说性能最好的消息队列服务器,能处理TB级别的数据
作用:点赞、评论时,服务器会自动给某个用户发送通知
kafka是个框架,如果不用框架还要解决类似问题,就要用到阻塞队列
BlockingQueue
阻塞队列就是一个接口BlockingQueue,java核心API中的接口
-
作用:解决线程通信的问题。
wait/notify等也可以解决线程通信问题,但更加麻烦,因此如果要在API中选一种方法解决线程通信问题,选择阻塞队列更好
-
阻塞方法:put、take。
-
生产者消费者模式
- 生产者:产生数据的线程。
- 消费者:使用数据的线程。
假设线程1为生产者,线程2为消费者,两线程直接打交道可能产生问题:
-
生产速度过快,远快于消费速度。当线程2不再需要数据后,线程1没有被阻塞,仍在占用cpu生产,造成资源浪费
-
生产速度很慢,赶不上消费速度。当线程2把数据使用完后,没有被阻塞,仍占用着cpu获取数据,但取不到,也造成系统资源浪费
BlockingQueue就是在生产者和消费者之间搭建了一个桥梁,构造一个缓冲空间,可避免两线程直接打交道。
线程1用put方法往队列里存数据,线程2用take方法从队列中取数据。故线程1为生产者,线程2为消费者。
线程1将生产的数据放在队列里,若队列未满,则继续生产;若队列满了,线程1就会被阻塞
线程2从队列中取数据,若队列不空,则继续取;若队列为空,线程2被阻塞
阻塞:等着,什么也不做,不会占用cpu资源,对系统性能没有影响。
BlockingQueue的实现类
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue、SynchronousQueue、DelayQueue等
阻塞队列代码实例
public class BlockingQueueTests {public static void main(String[] args) {// 构造容量为10的阻塞队列BlockingQueue queue = new ArrayBlockingQueue(10);// 创建一个生产者线程,并启动new Thread(new Producer(queue)).start();// 创建多个消费者线程,并启动new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();}}// 生产者类,用于构造生产者线程
class Producer implements Runnable {// 要求调用方在调用生产者线程时,要将阻塞队列传入,因为线程是要交给阻塞队列去管理的private BlockingQueue<Integer> queue;// 阻塞队列:通过泛型声明阻塞队列中存什么// 有参构造器:实例化时传入阻塞队列public Producer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {// 生产者每隔20s生产一次数据,一共生产100次,将数据交给队列管理for (int i = 0; i < 100; i++) {Thread.sleep(20);queue.put(i);// 打印: 哪个线程在生产,生产后阻塞队列中有多少个数System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());}} catch (Exception e) {e.printStackTrace();}}}// 消费者
class Consumer implements Runnable {private BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {// 消费者只要有数据就一直消费while (true) {// 消费者消费数据的时间间隔不是确定的,不能写死// 大概率生成的随机数比20大,即大概率生产速度快于消费速度Thread.sleep(new Random().nextInt(1000));// 弹出队首数据queue.take();// 打印:消费者是谁.消费后阻塞队列里有多少数据System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());}} catch (Exception e) {e.printStackTrace();}}
}
一些选择题
A