1.阻塞队列
1.1阻塞队列是什么
阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:
1.当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
2.当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 "生产者消费者模型". 这是一种非常典型的开发模型.
1.2生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
1) 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.
比如在 "秒杀" 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程).这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
这样做可以有效进行“削峰”,防止服务器被突然到来的一波请求直接冲垮
2) 阻塞队列也能使生产者和消费者之间 解耦.
意思就是消费者只管消费阻塞队列中的数据,而不关心生产这些数据的人是谁,怎么生产的。
1.3标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.
BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
put 方法用于阻塞式的入队列, take方法用于阻塞式的出队列.
BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
//入队列
queue.put("abc");
// 出队列 . 如果没有 put 直接 take, 就会阻塞 .
String elem = queue.take();
生产者消费者模型
package thread;import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class ThreadDemo13 {public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();Thread customer = new Thread(()->{while(true) {try {int key = blockingQueue.take();System.out.println(Thread.currentThread().getName()+ "消费:" + key);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}},"消费者");Thread producer = new Thread(()-> {Random r = new Random();while(true){int x = r.nextInt(100);try {blockingQueue.put(x);System.out.println(Thread.currentThread().getName() +"生产:"+x);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}},"生产者");customer.start();producer.start();customer.join();producer.join();}
}
1.4阻塞队列实现
通过 "循环队列" 的方式来实现.
使用 synchronized 进行加锁控制.
put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait.被唤醒时不一 定队列就不满了, 因为同时可能是唤醒了多个线程).
take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
public class MyBlockingQueue {private final String[] elem = new String[1000];private int head = 0;private int tail = 0;private int size = 0;final Object locker = new Object();public void put(String key) throws InterruptedException {synchronized (locker) {// 此处最好使用 while.// 否则 notifyAll 的时候 , 该线程从 wait 中被唤醒 ,// 但是紧接着并未抢占到锁 . 当锁被抢占的时候 , 可能又已经队列满了 // 就只能继续等待while (size >= elem.length) {locker.wait();}elem[tail] = key;tail++;size++;if (tail >= elem.length) {tail = 0;}locker.notify();}}public String take() throws InterruptedException {synchronized (locker) {while (size == 0) {locker.wait();}String key = elem[head];head++;size--;locker.notify();return key;}}public static void main(String[] args) throws InterruptedException {MyBlockingQueue myBlockingQueue = new MyBlockingQueue();Thread producer = new Thread(()->{int count = 0;while(true){try {myBlockingQueue.put(count+"");System.out.println("生产者生产:"+count);count++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});Thread customer = new Thread(()->{while(true){try {String res = myBlockingQueue.take();System.out.println("消费者消费:"+res);} catch (InterruptedException e) {e.printStackTrace();}}});producer.start();customer.start();}
}
2.定时器
2.1定时器是什么
定时器也是软件开发中的一个重要组件. 类似于一个 "闹钟". 达到一个设定的时间之后, 就执行某个指定好的代码,比如网络通信中, 如果对方 500ms 内没有返回数据, 则断开连接尝试重连.
比如一个 Map, 希望里面的某个 key 在 3s 之后过期(自动删除).类似于这样的场景就需要用到定时器.
2.2标准库中的定时器
标准库中提供了一个 Timer 类. Timer 类的核心方法为schedule包含两个参数. 第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后 执行 (单位为毫秒).
Timer timer = new Timer();
timer.schdule(new TimeTask(){@Override public void run(){System.out.println("hello"); }
},3000);
2.3实现定时器
一个带优先级的阻塞队列
为啥要带优先级呢?
因为阻塞队列中的任务都有各自的执行时刻 (delay).最先执行的任务一定是 delay 最小的. 使用带优先级的队列就可以高效的把这个 delay 最小的任务找出来.
队列中的每个元素是一个 Task 对象.
Task 中带有一个时间属性, 队首元素就是即将执行的元素
同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行
import java.util.PriorityQueue;// 创建一个类, 用来描述定时器中的一个任务
class MyTimerTask implements Comparable<MyTimerTask> {// 任务啥时候执行. 毫秒级的时间戳.private long time;// 任务具体是啥.private Runnable runnable;public MyTimerTask(Runnable runnable, long delay) {// delay 是一个相对的时间差. 形如 3000 这样的数值.// 构造 time 要根据当前系统时间和 delay 进行构造.time = System.currentTimeMillis() + delay;this.runnable = runnable;}public long getTime() {return time;}public Runnable getRunnable() {return runnable;}@Overridepublic int compareTo(MyTimerTask o) {// 认为时间小的, 优先级高. 最终时间最小的元素, 就会放到队首.// 怎么记忆, 这里是谁减去谁?? 不要记!! 记容易记错~~// 随便写一个顺序, 然后实验一下就行了.return (int) (this.time - o.time);// return (int) (o.time - this.time);}
}// 定时器类的本体
class MyTimer {// 使用优先级队列, 来保存上述的 N 个任务private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();// 用来加锁的对象private Object locker = new Object();// 定时器的核心方法, 就是把要执行的任务添加到队列中.public void schedule(Runnable runnable, long delay) {synchronized (locker) {MyTimerTask task = new MyTimerTask(runnable, delay);queue.offer(task);// 每次来新的任务, 都唤醒一下之前的扫描线程. 好让扫描线程根据最新的任务情况, 重新规划等待时间.locker.notify();}}// MyTimer 中还需要构造一个 "扫描线程", 一方面去负责监控队首元素是否到点了, 是否应该执行; 一方面当任务到点之后,// 就要调用这里的 Runnable 的 Run 方法来完成任务public MyTimer() {// 扫描线程Thread t = new Thread(() -> {while (true) {try {synchronized (locker) {while (queue.isEmpty()) {// 注意, 当前如果队列为空, 此时就不应该去取这里的元素.// 此处使用 wait 等待更合适. 如果使用 continue, 就会使这个线程 while 循环运行的飞快,// 也会陷入一个高频占用 cpu 的状态(忙等).locker.wait();}MyTimerTask task = queue.peek();long curTime = System.currentTimeMillis();if (curTime >= task.getTime()) {// 假设当前时间是 14:01, 任务时间是 14:00, 此时就意味着应该要执行这个任务了.// 需要执行任务.queue.poll();task.getRunnable().run();} else {// 让当前扫描线程休眠一下, 按照时间差来进行休眠.// Thread.sleep(task.getTime() - curTime);locker.wait(task.getTime() - curTime);}}} catch (InterruptedException e) {e.printStackTrace();}}});t.start();}
}// 写一个定时器
public class TimeTest {public static void main(String[] args) {MyTimer timer = new MyTimer();timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello 3");}}, 3000);timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello 2");}}, 2000);timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello 1");}}, 1000);System.out.println("程序开始运行");}
}