七、并发工具(上)

一、自定义线程池

1)背景:

  • 在 QPS 量比较高的情况下,我们不可能说所有的访问都创建一个线程执行,这会导致内存占用过高,甚至有可能出现 out of memory
  • 另外也要考虑 cpu 核数,如果请求超过了cpu核数,那么有一部分线程就会收到限制,然后等cpu时间片结束会进行一个上下文切换,频繁的上下文也会影响性能。

总和以上,我们可以引出线程池的概念,也就是结合前面的享元模式,创建一批线程,复用线程,既可以较少内存占用,又可以较少线程上下文切换。

2)任务数量放不满 Blocking Queue

import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.TestPool")
public class Test1AndPoolHandle1 {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);for (int i = 0; i < 5; i ++ ) {int j = i;threadPool.execute(() -> {log.debug("{}", j);});}}
}
@Slf4j(topic = "c.TestPool")
class ThreadPool {// 线程池大小private int capacity;// 队列private BlockQueue<Runnable> blockQueue;// 线程队列private HashSet<Worker> workers = new HashSet<>();private long timeout;private TimeUnit timeUnit;public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize) {this.timeout = timeout;this.timeUnit = unit;this.capacity = capacity;blockQueue = new BlockQueue<>(queueSize);}// 执行任务public void execute(Runnable runnable) {synchronized (workers) {if (workers.size() < capacity) {Worker worker = new Worker(runnable);log.debug("开始创建线程...{}", worker);workers.add(worker);worker.start();} else {log.debug("线程已满...加入队列");blockQueue.put(runnable);}}}class Worker extends Thread{private Runnable runnable;public Worker(Runnable runnable) {this.runnable = runnable;}@Overridepublic void run() {//执行任务// 1) 当任务不为空是,执行任务// 2) 当任务执行完毕,接着从队列中获取任务执行//这里有赋值不用上锁,是因为poll跟take实现都加了锁//            while ((runnable != null) || (runnable = blockQueue.take()) != null) {while ((runnable != null) || (runnable = blockQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行。。。{}", runnable);runnable.run();} catch (Exception e) {throw new RuntimeException(e);} finally {runnable = null;}}synchronized (workers) {log.debug("{} 被移除了", this);workers.remove(this);}}}
}@Slf4j(topic = "c.TestPool")
class BlockQueue<T> {// 队列大小private int capacity;//存放队列private Deque<T> deque = new ArrayDeque<>();// 一把锁private ReentrantLock lock = new ReentrantLock();// 等待条件private Condition fullCondition = lock.newCondition();private Condition emptyCondition = lock.newCondition();public BlockQueue(int capacity) {this.capacity = capacity;}public int size() {return deque.size();}// 获取任务 设置超时时间public T poll(long time, TimeUnit unit) {try {lock.lock();long nanos = unit.toNanos(time);while (deque.isEmpty()) {try {if (nanos <= 0) {log.debug("超时结束");return null;}//进行等待nanos = emptyCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 获取任务public T take() {try {lock.lock();while (deque.isEmpty()) {try {emptyCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}//            log.debug("删除队列头...");return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 阻塞添加存放任务public void put(T runnable) {try {lock.lock();while (deque.size() == capacity) {log.debug("等待加入任务队列 {} ...", runnable);try {fullCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("存放到队列尾");deque.addLast(runnable);emptyCondition.signal();} finally {lock.unlock();}}
}

2)任务数量放满 Blocking Queue (改进)

1. 带超时时间的 阻塞添加

// 带超时时间的存放任务
public boolean offer(T runnable, long time, TimeUnit unit) {try {lock.lock();long nanos = unit.toNanos(time);while (deque.size() == capacity) {log.debug("等待加入任务队列 {} ...", runnable);try {if (nanos <= 0) {log.debug("等待加入任务队列超时 {} ...", runnable);return false;}nanos = fullCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("线程已满...加入队列{}",runnable);deque.addLast(runnable);emptyCondition.signal();return true;} finally {lock.unlock();}
}

2. 设计模式 之 策略模式

如果存放的队列已经满了,之前的版本就进入阻塞死等了,那么我们可以改进的方式

  • 带超时时间的等待
  • 让调用者放弃执行
  • 让调用者自己执行
  • 让调用者抛出异常
  • ... 等

有很多方式,如果写死的话,就会有很多else if,那么定死了,没有扩展性,所以使用策略模式,将权利下放,利用函数式接口,让调用者传进来拒绝策略。

import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.TestPool")
public class Test1AndPoolHandle1 {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {// 1、 死等//queue.put(task);// 2、 带超时时间等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3、 让调用者放弃
//            log.debug("放弃任务{}", task);// 4、 让调用者抛出异常
//            throw new RuntimeException("跑出异常" + task);// 5、 让调用者自己执行task.run();});for (int i = 0; i < 3; i ++ ) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);log.debug("{}", j);} catch (InterruptedException e) {throw new RuntimeException(e);}});}}
}@FunctionalInterface
interface RejectFunction<T> {void reject(BlockQueue<T> tBlockQueue, T task);}
@Slf4j(topic = "c.TestPool")
class ThreadPool {// 线程池大小private int capacity;// 队列private BlockQueue<Runnable> blockQueue;// 线程队列private HashSet<Worker> workers = new HashSet<>();private long timeout;private TimeUnit timeUnit;private RejectFunction<Runnable> rejectFunction;public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize, RejectFunction<Runnable> rejectFunction) {this.timeout = timeout;this.timeUnit = unit;this.capacity = capacity;blockQueue = new BlockQueue<>(queueSize);this.rejectFunction = rejectFunction;}// 执行任务public void execute(Runnable runnable) {synchronized (workers) {if (workers.size() < capacity) {Worker worker = new Worker(runnable);log.debug("开始创建线程...{}", worker);workers.add(worker);worker.start();} else {
//                blockQueue.put(runnable);blockQueue.tryPut(rejectFunction, runnable);}}}class Worker extends Thread{private Runnable runnable;public Worker(Runnable runnable) {this.runnable = runnable;}@Overridepublic void run() {//执行任务// 1) 当任务不为空是,执行任务// 2) 当任务执行完毕,接着从队列中获取任务执行//这里有赋值不用上锁,是因为poll跟take实现都加了锁
//            while ((runnable != null) || (runnable = blockQueue.take()) != null) {while ((runnable != null) || (runnable = blockQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行。。。{}", runnable);runnable.run();} catch (Exception e) {throw new RuntimeException(e);} finally {runnable = null;}}synchronized (workers) {log.debug("{} 被移除了", this);workers.remove(this);}}}
}@Slf4j(topic = "c.BlockQueue")
class BlockQueue<T> {// 队列大小private int capacity;//存放队列private Deque<T> deque = new ArrayDeque<>();// 一把锁private ReentrantLock lock = new ReentrantLock();// 等待条件private Condition fullCondition = lock.newCondition();private Condition emptyCondition = lock.newCondition();public BlockQueue(int capacity) {this.capacity = capacity;}public int size() {return deque.size();}// 获取任务 设置超时时间public T poll(long time, TimeUnit unit) {try {lock.lock();long nanos = unit.toNanos(time);while (deque.isEmpty()) {try {if (nanos <= 0) {log.debug("超时结束");return null;}//进行等待nanos = emptyCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 获取任务public T take() {try {lock.lock();while (deque.isEmpty()) {try {emptyCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}
//            log.debug("删除队列头...");return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 存放任务public void put(T runnable) {try {lock.lock();while (deque.size() == capacity) {log.debug("等待加入任务队列 {} ...", runnable);try {fullCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("线程已满...加入队列{}",runnable);deque.addLast(runnable);emptyCondition.signal();} finally {lock.unlock();}}// 带超时时间的存放任务public boolean offer(T runnable, long time, TimeUnit unit) {try {lock.lock();long nanos = unit.toNanos(time);while (deque.size() == capacity) {log.debug("等待加入任务队列 {} ...", runnable);try {if (nanos <= 0) {log.debug("等待加入任务队列超时 {} ...", runnable);return false;}nanos = fullCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("线程已满...加入队列{}",runnable);deque.addLast(runnable);emptyCondition.signal();return true;} finally {lock.unlock();}}public void tryPut(RejectFunction<T> rejectFunction , T runnable) {lock.lock();try {if (deque.size() == capacity) {rejectFunction.reject(this, runnable);} else {log.debug("线程已满...加入队列{}",runnable);deque.addLast(runnable);emptyCondition.signal();}} finally {lock.unlock();}}
}

二、ThreadPoolExecutor

1)线程池状态

ThreadPoolExecutor 使用 int 的高3位来表示线程池状态,低29位表示线程数量。

从数字上比较 RUNNING > SHUTDOWN > STOP > TIDYING > TERMINATED

为什么111 比 00小,因为是一个整数的高3位,所以第一位为1表示负数。

问题:

为什么使用一个整数表示线程池状态和线程数量,不用两个变量来表示呢?

答:

因为要保证在对 线程池状态 和 线程数量 进行赋值操作时的原子性

那么他存在一个变量中,就只需要一次cas操作就可以保证原子性,既可以保存状态信息,也可以保存数量信息。

如果存在两个变量中,就需要两次cas操作才可以保证原子性,才可以对两个变量进行赋值操作。

2)构造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

假设 核心线程= 2 救急线程=1 阻塞队列=2

  • 一开始任务阻塞队列空闲,线程也空闲
  • 任务1 创建 核心线程1 执行
  • 任务2 创建 核心线程2 执行
  • 核心线程满了,任务3 进入 阻塞队列
  • 核心线程满了,任务4 进入 阻塞队列
  • 核心线程满了, 阻塞队列满了,任务5 创建 救急线程1 执行
  • 核心线程满了, 阻塞队列满了,救急线程满了,任务6执行拒绝策略

救急线程空闲一定时间会自动销毁,等待下次用到在创建

核心线程一直运行。

触发救急线程创建的前提 是配合有界队列来使用

如果线程到达 最大线程数量 仍然有新的任务,这时会执行拒绝策略。

拒绝策略 jdk 提供了 4 种实现,其他著名框架也提供了实现。

3)newFixedThreadPool (固定大小线程池)

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
//可以给第二参数,传线程工厂,提供创建新线程的名字。

特点:

  • 核心线程 == 最大线程数 (没有救急线程),因此无需超时时间
  • 阻塞队列是无界的,没有指定大小,可以放任务数量任务

评价

适用于任务量已知,相对耗时的任务

4)newCachedThreadPool (带缓冲线程池)

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
}

特点

  • 核心线程为0,最大线程为Integer.MAX_VALUE,救急线程的空闲生存时间 60s
    • 也就是全部都是救急线程(60s后可以回收)
    • 救急线程可以无限创建
  • 队列采用 SynchronousQueue ,特点是没有容量,没有线程来获取的时候放不进去任务(类似于 一手交钱、一手交货)
  • 刚好当前线程池全部都是救急线程,每个任务都会创建一个新的线程来获取任务,合适

评价

整个线程池会不断创建新的线程,任务量有多少线程就会创建多少,然后空闲60s后释放线程

适用于任务量比较密集,但是任务执行时间比较短的情况

5)newSingleThreadExecutor (单线程线程池)

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

使用场景:

希望多个任务串行执行。线程数固定为1,当任务数量多于1时,会存放到阻塞队列中(无界)。任务执行完毕,线程也不会被摧毁释放。

那么跟 固定大小线程池设置成1 或者 自己创建一个线程执行 有什么区别

  • 自己创建一个线程执行,如果中间发生了异常,那么线程销毁了,阻塞队列中的任务就不执行了,没有什么补救措施,但是如果单线程线程池还会创建一个新的线程,保证正常工作。
  • newFixedThreadPool(1) 固定大小线程池设置成1,返回的对象还是可以修改的
    • 返回的对象还是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize 等方法修改。
  • 单线程线程池 newSingleThreadExecutor() 始终线程数量为1,不能修改
    • 返回的对象 通过new FinalizableDelegatedExecutorService进行了包装,应用了一种装饰器模式,只对外暴露 ExecutorService 接口, 因此不能调用 ThreadPoolExecutor 中特有方法修改。

6)提交任务

1. ALL

2. submit 演示

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2SubmitShow {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool = Executors.newFixedThreadPool(2);//带有返回结果的submit执行,可以使用lambda表达式Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("执行");Thread.sleep(1000);return "ok";}});//获取值,会等待submit执行完,相当于保护性暂停模式log.debug("{}", future.get());}
}

3. invokeAll 演示

import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2Show {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool = Executors.newFixedThreadPool(3);// 调用 invokeAll 执行一个链表中的所有任务,然后将所有返回值收集成一个list 类型是Future// 可以设置超时时间, 如果规定时间内没有执行完所有任务,直接终止List<Future<Object>> futures = pool.invokeAll(Arrays.asList(() -> {log.debug("begin");Thread.sleep(1000);return "1";},() -> {log.debug("begin");Thread.sleep(500);return "2";},() -> {log.debug("begin");Thread.sleep(2000);return "3";}));// 便利所有Future,等待所有执行完毕之后输出// 如果线程数量是2 任务3进入队列等 所以等待时间 500 + 2000// 如果线程数量是3 任务3直接与信工 等待时间 2000futures.forEach(f -> {try {log.debug("{}", f.get());} catch (ExecutionException | InterruptedException e) {throw new RuntimeException(e);}});}
}

4. invokeAny 演示

import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2SubmitShow {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool = Executors.newFixedThreadPool(3);// 调用 invokeAny 执行一个链表中的所有任务,然后将执行最快的一个任务的返回值返回,然后其他任务不执行了// 如果只有一个线程, 那么只有第一个任务被执行,所以只会返回第一个任务的结果String result = pool.invokeAny(Arrays.asList(() -> {log.debug("begin");Thread.sleep(1000);log.debug("end");return "1";},() -> {log.debug("begin");Thread.sleep(500);log.debug("end");return "2";},() -> {log.debug("begin");Thread.sleep(2000);log.debug("end");return "3";}));//最后拿到 的是2  然后1 跟 3的end没输出log.debug("{}", result);}
}

7)关闭线程池

shutdown

showdownNow

其他方法

三、设计模式 之 异步模式 之 工作线程

1) 定义

  • 让有限的工作线程(Worker Thread)可以轮流异步处理无限多的任务,也可以归类为分工模式。
  • 他的典型实现是 线程池,体现了经典设计模式中的享元模式。

例如:餐厅的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客户分配一位服务员,那么成本就太高了

(对比另外一种多线程设计模式:Thread-Pre-Message 这个是一个任务创建一个新线程处理)

注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。

例如:如果一个餐厅的工人既要招呼客人(任务类型A),又要后厨做菜(任务类型B) 显然效率很低,因为他全都要干,

这时候如果分成招呼客人一批人(线程池A),做菜一批人(线程池B)更为合理,更细致的分工效率跟更高。

2)饥饿

这里的饥饿跟 synchronized 那里的饥饿有点区别,这里不是因为锁导致的,而是因为线程不足导致的,现象类似于死锁,但是不是锁导致的。

一般是固定大小线程池有饥饿现象。

  • 有两个工人在同一个线程池中表示两个线程。
  • 两个工人是全能的,既能做饭,又能处理点餐,有以下两种阶段情景
    • 1、有一个客人点餐,工人A处理点餐,等待上菜,工人B做菜,工人A上菜,一气呵成,分工合作很顺利
    • 2、有两个客人同时点餐,工人A和工人B同时处理点餐,同时等待上菜,没有额外工人(线程)去做菜,这时候就死锁了。

情况1


import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;@Slf4j(topic = "c.pool")
public class Test3Executor {static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);// 有人点餐,执行任务pool.execute(() -> {log.debug("点餐");Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜");Thread.sleep(1000);return list.get(0);}});try {log.debug("做完{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:17:54.256 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:17:54.261 [pool-1-thread-2] DEBUG c.pool - 做菜
// 21:17:55.262 [pool-1-thread-1] DEBUG c.pool - 做完红烧牛肉1

情况2:

import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;@Slf4j(topic = "c.pool")
public class Test3Executor {static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);// 有人点餐,执行任务pool.execute(() -> {log.debug("点餐");Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜");Thread.sleep(1000);return list.get(0);}});try {log.debug("做完{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});// 有人点餐,执行任务pool.execute(() -> {log.debug("点餐");Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜");Thread.sleep(1000);return list.get(0);}});try {log.debug("做完{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:19:36.930 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:19:36.930 [pool-1-thread-2] DEBUG c.pool - 点餐

3)解决饥饿

解决1(不全面):

  • 增加核心线程来解决,但是如果同时比较多,还是没有办法解决。

解决2(全面):

  • 不同功能的线程放到不同线程池
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;@Slf4j(topic = "c.pool")
public class Test3Executor {static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");public static void main(String[] args) {ExecutorService cookPool = Executors.newFixedThreadPool(1);ExecutorService waitPool = Executors.newFixedThreadPool(1);// 有人点餐,执行任务waitPool.execute(() -> {log.debug("点餐1");Future<String> future = cookPool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜1");Thread.sleep(1000);return list.get(0);}});try {log.debug("做完1{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});// 有人点餐,执行任务waitPool.execute(() -> {log.debug("点餐2");Future<String> future = cookPool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜2");Thread.sleep(1000);return list.get(2);}});try {log.debug("做完2{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:35:52.924 [pool-2-thread-1] DEBUG c.pool - 点餐1
// 21:35:52.924 [pool-2-thread-2] DEBUG c.pool - 点餐2
// 21:35:52.927 [pool-1-thread-1] DEBUG c.pool - 做菜1
// 21:35:53.928 [pool-1-thread-1] DEBUG c.pool - 做菜2
// 21:35:53.928 [pool-2-thread-1] DEBUG c.pool - 做完1红烧牛肉1
// 21:35:54.930 [pool-2-thread-2] DEBUG c.pool - 做完2牛肉3

4)创建多少线程池合适?

  • 过小会导致不能充分利用cpu系统资源,容易导致饥饿
  • 过大会导致频繁发生线程上下文切换,占用更多内存

1. CPU 密集型运算

通常采用 cpu核数 + 1 能够实现最优的CPU利用率, +1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,

额外的这个线程能够顶上去,保证CPU时钟周期不被浪费。常用于数据分析时。

2. I/O 密集型运算

CPU 不总是在繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但是当你执行I/O操作时、远程RPC调用、数据库操作时等,这时候CPU资源就空闲了

你可以利用多线程提高他的利用率。

经验公式如下:

  • 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间 +等待时间) / CPU 计算时间

例如

4 核CPU计算时间是50%,其他等待时间是50%,期望cpu被100%利用, 套用公式

4 * 100% * 100% / 50% = 8

四、任务调度线程池

有些时候我们需要延迟执行任务(延迟几秒后执行),或者需要循环执行任务(每隔几秒执行一次),这时候就需要用到任务调度的线程池了。

JDK5的时候加入任务调度线程池

JDK5之前 可以使用java.util.Timer 来实现定时功能

1. Timer

优点:简单易用,

缺点:所有任务都由一个独立线程执行,所有任务都是串行执行的,如果中间有一个任务发生异常,后面的任务也不会运行了。

@Slf4j(topic = "c.pool")
public class Test4 {public static void main(String[] args) {Timer timer = new Timer();TimerTask task1 = new TimerTask() {@Overridepublic void run() {log.debug("task 1");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("task 2");try {Thread.sleep(0);} catch (InterruptedException e) {throw new RuntimeException(e);}}};log.debug("start");// 一秒后执行 task1 task2timer.schedule(task1, 1000);timer.schedule(task2, 1000);}
}
// 16:17:24.730 [main] DEBUG c.pool - start
// 16:17:25.734 [Timer-0] DEBUG c.pool - task 1
// 16:17:27.735 [Timer-0] DEBUG c.pool - task 2

2. ScheduledExecutorService

解决了Timer的缺点,抛出异常不会终止其他任务,还可以设置多个线程参与调度。

如果ScheduledExecutorService(1) 设置成1,那么也是串行执行,但是不会终止其他任务

a. 延迟任务
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Slf4j(topic = "c.pool")
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);pool.schedule(() -> {log.debug("task1");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, TimeUnit.SECONDS);pool.schedule(() -> {log.debug("task2");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, TimeUnit.SECONDS);}
}
// 16:23:45.724 [pool-1-thread-1] DEBUG c.pool - task1
// 16:23:45.724 [pool-1-thread-2] DEBUG c.pool - task2
b. 定时任务
ⅰ. scheduleAtFixedRate
@Slf4j(topic = "c.pool")
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start");// 延迟1秒执行,然后每隔1秒执行一次// 如果任务的延迟比定时还长,那么所有任务会在上一个任务执行完之后 紧接着运行pool.scheduleAtFixedRate(() -> {log.debug("task1");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, 1, TimeUnit.SECONDS);}
}
// 16:33:06.263 [main] DEBUG c.pool - start
// 16:33:07.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:09.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:11.303 [pool-1-thread-1] DEBUG c.pool - task1
ⅱ. scheduleWithFixedDelay
@Slf4j(topic = "c.pool")
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start");// 不管任务执行多长时间,都会等他工作完成之后再延迟一秒执行一次pool.scheduleWithFixedDelay(() -> {log.debug("task1");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, 1, TimeUnit.SECONDS);}
}
// 16:35:55.014 [main] DEBUG c.pool - start
// 16:35:56.048 [pool-1-thread-1] DEBUG c.pool - task1
// 16:35:59.051 [pool-1-thread-1] DEBUG c.pool - task1
// 16:36:02.051 [pool-1-thread-1] DEBUG c.pool - task1

正确处理线程池异常

  • try catch 手动捕捉
  • 普通线程池可以使用配合feture的返回值get()等待,如果有异常,返回值会是异常描述

3. 线程池应用

如何让每周日 18:00:00 定时执行任务?

import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class TestTaskRun {// 如何让每周日 18:00:00 定时执行任务?public static void main(String[] args) {// 获取当前时间LocalDateTime now = LocalDateTime.now();System.out.println("now = " + now);// 获取本周周日时间LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);// 如果当前时间 > 本周周四,那么获取的time是不对的,需要是下周的if (now.isAfter(time)) {time = time.plusWeeks(1);}System.out.println("time = " + time);long period = 1000 * 60 * 60 * 24 * 7; // 一周时间long initialDelay = Duration.between(now, time).toMillis();  //获取两个时间之间的毫秒差值ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);pool.scheduleAtFixedRate(() -> {System.out.println("running...");}, initialDelay, period, TimeUnit.MILLISECONDS);}
}

五、Tomcat 线程池

浏览器发出一个请求

首先会经过LimitLatch(作用是限流,控制最大连接数,防止太多连接把服务器压垮),类似于J.U.C 中的 Semaphore 后面再讲

当没有超过最大连接数,到达第二个组件 Acceptor(本质上是一个线程,不断循环看有没有新的连接,主要就是负责接受socket连接,其他不管)

然后Poller (也是一个线程,死循环看连接上是否有这种可读的事件发生,但是也是负责监听是否有,如果有交给一个Executor来执行)来负责看是否有IO的读写操作

一旦Poller 发现可读,会封装一个任务对象(socketProcessor实现了runnable),提交个Executor线程池处理

Executor 线程池中的工作线程(核心救急线程模式)最终负责【处理请求】

Tomcat 线程池扩展了 ThreadPoolExecutor ,行为稍有不同

  • 如果总线程数达到 最大线程数 并且队列也满了
    • 这时不会立刻抛出 RejectedExecutionException 异常
    • 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

源码 tomcat- 7.0.42

tomcat 线程池配置

tomcat 线程池相对于 java原生线程池,tomcat进行了一些改进,主要体现在核心在于Tomcat自己定义的任务队列。

TaskQueue —— offer(Runnable o)方法

public class TaskQueue extends LinkedBlockingQueue<Runnable> {private transient volatile ThreadPoolExecutor parent = null;public TaskQueue() {super();}public void setParent(ThreadPoolExecutor tp) {parent = tp;}@Overridepublic boolean offer(Runnable o) {//we can't do any checksif (parent==null) {return super.offer(o);}//we are maxed out on threads, simply queue the object// 如果线程数量已经达到最大数量,则进入队列等待执行if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) {return super.offer(o);}//we have idle threads, just add it to the queue// 执行到这里 最大线程数 > 当前线程数 > 核心线程数。// 如果提交的任务数 <= 当前线程数则说明存在空闲线程,则提交到任务队列,等待执行if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) {return super.offer(o);}//if we have less threads than maximum force creation of a new thread// 执行到这,说明提交的任务数已经大于当前线程数,需要创建新的线程if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) {return false;}//if we reached here, we need to add it to the queuereturn super.offer(o);}
}
  • submittedCount:Tomcat自定义的ThreadPoolExecutor中使用该字段来标记当前已提交的任务数,提交加一,执行完减一。
  • poolSizeNoLock:当前执行的线程数

  • 判断当前是否需要创建空闲线程执行任务,而不是全放入队列,因为LinkedBlockingQueue默认是无界的,但也可以设计最大数量;

六、Fork / Join线程池

1)概念

  • JDK 1.7 之后加入的新的线程池实现, 使用了分治思想,适用于能够进行任务拆分的 cpu 密集型运算。
  • 所谓任务拆分,是一个大任务拆分算法上相同的小任务,直到不能再拆分然后求解,最后合并,类似于归并排序,裴波那契数列等思想
  • 分治思想的基础上加入了多线程,每个任务的分解和合并交给不同线程执行,进一步提升运算效率。
  • 默认会创建与 cpu 核数大小相同的线程池。

2)使用

1. 拆分不完善(导致串行)

package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;@Slf4j(topic = "c.TestForkJoin")
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool poll = new ForkJoinPool(4);// 运行调用很简单System.out.println(poll.invoke(new MyTask(5)));}
}@Slf4j(topic = "c.MyTask")
// 计算机1 - n之间的和
class MyTask extends RecursiveTask<Integer> {private int n;public MyTask(int n) {this.n = n;}@Overridepublic String toString() {return "MyTask{" +"n=" + n +'}';}@Overrideprotected Integer compute() {// 递归结束条件if (n == 1) {log.debug("join() {}", n);return 1;}//分解成子问题MyTask myTask = new MyTask(n - 1);myTask.fork();   //分配线程执行log.debug("fork() {} + {}", n, myTask);//获取线程返回结果Integer join = myTask.join();log.debug("join() {} + {} = {}", n, myTask, n + join);return n + join;  //然后拼接子问题返回答案}
}
// 21:07:44.838 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - fork() 5 + MyTask{n=4}
// 21:07:44.838 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - fork() 4 + MyTask{n=3}
// 21:07:44.838 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - fork() 2 + MyTask{n=1}
// 21:07:44.838 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - fork() 3 + MyTask{n=2}
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 1
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 2 + MyTask{n=1} = 3
// 21:07:44.842 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - join() 3 + MyTask{n=2} = 6
// 21:07:44.842 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - join() 4 + MyTask{n=3} = 10
// 21:07:44.842 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - join() 5 + MyTask{n=4} = 15
// 15

2. 分治思想(并行)

package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;@Slf4j(topic = "c.TestForkJoin")
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool poll = new ForkJoinPool(4);// 运行调用很简单System.out.println(poll.invoke(new AddTask(1, 5)));}
}
@Slf4j(topic = "c.AddTask")
// 计算机1 - n之间的和
class AddTask extends RecursiveTask<Integer> {private int start;private int end;public AddTask(int start, int end) {this.start = start;this.end = end;}@Overridepublic String toString() {return "AddTask{" +"start=" + start +", end=" + end +'}';}@Overrideprotected Integer compute() {// 递归结束条件if (start == end) {log.debug("join() {}", start);return start;}if (end == start + 1) {log.debug("join() {}", end + start);return end + start;}//分解成子问题int mid = (start + end) / 2;AddTask addTask1 = new AddTask(start, mid);addTask1.fork();   //分配线程执行log.debug("fork()  {}", addTask1);AddTask addTask2 = new AddTask(mid + 1, end);addTask2.fork();   //分配线程执行log.debug("fork()  {}", addTask2);//获取线程返回结果Integer join = addTask1.join() + addTask2.join();log.debug("join() {} , {} = {}", start, end, join);return join;  //然后拼接子问题返回答案}
}

七、AQS原理

1)概述

2)自定义锁

自定义一个同步器组件,例如可重入锁那些都是这么做。在你自定义的同步器组件内部定义一个AQS类的内部类,所有操作交给这个内部类完成,我们的自定义同步组件只需要调用这个内部类的方法

这里要注意,acquire()是AQS实现的模板方法可以直接用,tryAquire()是自定义的加锁方法, 模板会调用此自定义实现方法

它的acquire、release方法是aqs的固定机制,比如怎么把线程阻塞,怎么加入队列等等,而tryxx是自己实现的,自己决定是否可重入,是否可共享等等

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;@Slf4j(topic = "c.MyLock")
public class Test6Lock {public static void main(String[] args) {MyLock lock = new MyLock();new Thread(() -> {lock.lock();log.debug("lock");lock.lock();log.debug("lock");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {log.debug("unlock");lock.unlock();}}).start();
//        new Thread(() -> {
//            lock.lock();
//            log.debug("lock");
//            try {
//                Thread.sleep(1);
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            } finally {
//                log.debug("unlock");
//                lock.unlock();
//            }
//        }).start();}
}// 自定义锁 (不可重入锁)
class MyLock implements Lock {class MyAQS extends AbstractQueuedSynchronizer {// 独占锁  同步器类@Overrideprotected boolean tryAcquire(int arg) {// cas 操作是否可以获得锁if (compareAndSetState(0, 1)) {//然后锁的Owner 是当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {// 释放锁之后清空OwnersetExclusiveOwnerThread(null);// 因为是volatile 所以因为写屏障,所以前面不会重排指令到state后面,同步到主存setState(0);return true;}@Override  //是否持有独占锁protected boolean isHeldExclusively() {return getState() == 1;}public Condition newCondition() {return new ConditionObject();}}MyAQS sync = new MyAQS();@Override  //加锁 (加锁失败,进入队列)public void lock() {sync.acquire(1);}@Override  // 加锁,但是可以打断public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Override // 尝试加锁(一次)public boolean tryLock() {return sync.tryAcquire(1);}@Override  // 尝试加锁,带超时public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Override  //释放锁public void unlock() {sync.release(1);  //释放锁,底层还会唤醒等待的线程}@Override   //创建条件变量public Condition newCondition() {return sync.newCondition();}
}

八、ReentrantLock 原理

1)加锁成功流程

非公平锁实现原理:

加锁解锁流程:

构造器方法,默认是非公平锁实现

public ReentrantLock() {sync = new NonfairSync();
}

NonfairSync 继承自 AQS,原理类似于刚才的AQS实现自定义锁

2)加锁失败流程

第一次尝试:如果已经被别人加锁了,调用acquire

第二次尝试:如果这时候加锁的人已经释放,则不需要继续往下,否则就创建AddWaiter构造一个链表(即head、tail),添加一个Node链表节点,加入队列

  • 首次创建会创建两个,第一个Node称为 Dummy(哑元)或哨兵,用来占位,并不关联线程。
  • 黄色三角形表示该Node的 waitStatus 状态,默认 0 为正常状态

进入acquireQueue () 逻辑

第三个尝试:循环判断当前节点的前驱节点是不是头结点,说明他是第一个节点,有资格去再次尝试获取锁

进入 shouldParkAfterFailAcquire()逻辑

把前驱节点改成状态改成 -1,代表前驱节点有资格唤醒后继结点,因为如果进行park ,需要有人唤醒,第一次会方法会返回false

第四次尝试:再次尝试获取,获取失败,再次进入shouldParkAfterFailAcquire,因为前驱节点node已经是状态-1,所以这次返回true,进行一个park,阻塞住

3)解锁竞争成功流程

如果有多个线程经历上述过程竞争失败,变成这个样子

这时候 Thread - 0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

然后判断头结点是不是不为空,并且 waitStatus 状态不为0,那么就执行 unparkSuccessor 唤醒距离头结点最近的节点,然后他有资格继续尝试获取锁

获取到锁之后,设置state = 1, 并且 exclusiveOwnerThread = 当前线程,然后将头结点替换成当前节点 】

4)解锁竞争失败流程

因为是非公平锁,此时有另外一个线程Thread4,不是队列中的,而是第一次访问的线程,那么就会发生一个竞争

Thread-1 如果竞争失败,Thread-4 的线程就会设置成 exclusiveOwnerThread ,state = 1

Thread - 1 竞争失败再次进入 acquireQueue 流程,获取锁失败,重新进入 park 阻塞

公平锁的意思也就是 新线程来的时候会不会直接加入队列中。

5)锁重入原理

6)可打断原理

不可打断锁原理:

打断不会影响到获取锁停止,只有获得锁之后才会返回打断标记

可打断原理:

如果打断之后直接抛出异常,不会继续获取锁了

7)公平锁原理

公平锁原理:新线程进来获取锁的时候,判断是不是队列中有线程等待,来判断是否可以进行竞争锁。

8)条件变量 - await

每个条件变量其实对应着一个等待队列, 其实现类是 ConditionObject

开始Thread-0持有锁,调用await,进入 COnditionObject 的 addConditionWaiter 流程

创建 新的 Node 状态为 -2(Node.CONDITION) ,关联 Thread-0,加入等待队列尾部

然后进入 AQS 的fullyRelease 流程, 释放同步器上的锁,也就是当前线程的所有锁

为什么是fullyRelease 不是 因为可能发生了锁重入,所以需要释放所有锁

state = 0, exclusiveOwnerThread = null

然后unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么Thread - 1 竞争成功。

state = 1, exclusiveOwnerThread = thread1

9)条件变量 - signal

假设 Thread- 1 调用 signal 要来唤醒 条件变量中等待的 Thread - 0

调用 signal 方法,然后唤醒 条件变量中的 第一个节点,然后条件变量中的当前点 置为null,

转移成功 然后加入到等待队列队尾等待

转移失败 因为有些时候有时限的等待超时,或者被打断,那么就没有必要加入到队列队尾

加入队列,获取前一个节点,然后设置state 为 -1,因为最后一个是0(前一个节点设置成-1,表示有资格唤醒unpark新加入的线程)

如果前一个节点的状态 > 0 ,表示被取消了,那么需要唤醒当前线程,去找能够当做前驱节点的点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/661224.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【bitonicSort学习】

bitonicSort学习 什么是Bitonic Sort核心 什么是Bitonic Sort https://zhuanlan.zhihu.com/p/53963918 这个是用来并行排序的一个操作 之前学过一些CPU排序&#xff0c;快排 冒泡 归并啥的&#xff0c;有一些能转成并行&#xff0c;有一些不适合 像快排这种二分策略就可以考虑…

Vue3的自定义指令怎么迁移到nuxt3

一、找到Vue3中指令的源码 const DISTANCE 100; // 距离 const ANIMATIONTIME 500; // 500毫秒 let distance: number | null null,animationtime: number | null null; const map new WeakMap(); const ob new IntersectionObserver((entries) > {for (const entrie…

草图导入3d后模型贴材质的步骤?---模大狮模型网

3D模型在导入草图大师后出现混乱可能有多种原因&#xff0c;以下是一些可能的原因和解决方法&#xff1a; 模型尺寸问题&#xff1a;如果3D模型的尺寸在导入草图大师时与画布尺寸不匹配&#xff0c;可能导致模型混乱。解决方法是在3D建模软件中调整模型的尺寸&#xff0c;使其适…

FreeRTOS使用计数信号量进行任务同步与资源管理

FreeRTOS使用计数信号量进行任务同步与资源管理 介绍 在多任务系统中&#xff0c;任务之间的同步和对共享资源的管理是非常重要的。FreeRTOS 提供了丰富的同步机制&#xff0c;其中计数信号量是一种强大的工具&#xff0c;用于实现任务之间的同步和对资源的访问控制。 什么是…

figure方法详解之清除图形内容

figure方法详解之清除图形内容 一 clf():二 clear():三 clear()方法和clf()方法的区别&#xff1a; 前言 Hello 大家好&#xff01;我是甜美的江。 在数据可视化中&#xff0c;Matplotlib 是一个功能强大且广泛使用的库&#xff0c;它提供了各种方法来创建高质量的图形。在 Mat…

unity 拖入文件 窗口大小

目录 unity 拖入文件插件 设置窗口大小 unity 拖入文件插件 GitHub - Bunny83/UnityWindowsFileDrag-Drop: Adds file drag and drop support for Unity standalong builds on windows. 设置窗口大小 file build

Iceberg从入门到精通系列之二十一:Spark集成Iceberg

Iceberg从入门到精通系列之二十一&#xff1a;Spark集成Iceberg 一、在 Spark 3 中使用 Iceberg二、添加目录三、创建表四、写五、读六、Catalogs七、目录配置八、使用目录九、替换会话目录十、使用目录特定的 Hadoop 配置值十一、加载自定义目录十二、SQL 扩展十三、运行时配置…

电子电器架构——车载网关转发buffer心得汇总

电子电器架构——车载网关转发buffer心得汇总 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力…

vue2父组件向子组件传值时,子组件同时接收多个数据类型,控制台报警的问题

最近项目遇到一个问题,就是我的父组件向子组件(公共组件)传值时,子组件同时接收多个数据类型,控制台报警的问题,如下图,子组件明明写了可同时接收字符串,整型和布尔值,但控制台依旧报警: 仔细检查父组件,发现父组件是这样写的: <common-tabletooltip :content=…

2024 springboot Mybatis-flex 打包出错

Mybatis-flex官网&#xff1a;快速开始 - MyBatis-Flex 官方网站 从 Mybatis-flex官网获取模板后&#xff0c;加入自己的项目内容想打包确保错&#xff0c;先试试一下方法 这里改成skip的默认是true改成false&#xff0c;再次打包就可以了

Git系列---标签管理

&#x1f4d9; 作者简介 &#xff1a;RO-BERRY &#x1f4d7; 学习方向&#xff1a;致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f4d2; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;欢迎各位关注&#xff0c;谢谢各位的支持 目录 1.理解标签2.创建标签…

MySQL原理(一)架构组成之物理文件组成

目录 一、日志文件 1、错误日志 Error Log 1.1、作用&#xff1a; 1.2、开启关闭&#xff1a; 1.3、使用 2、二进制日志 Binary Log & Binary Log Index 2.1、作用&#xff1a; 2.2、开启关闭&#xff1a; 2.3、Binlog还有一些附加选项参数 &#xff08;1&#x…

verilog编程之乘法器的实现

知识储备 首先来回顾一下乘法是如何在计算机中实现的。 假设现在有两个32位带符号定点整数x和y&#xff0c;我们现在要让x和y相乘&#xff0c;然后把乘积存放在z中&#xff0c;大家知道&#xff0c;两个32位数相乘&#xff0c;结果不会超过64位&#xff0c;因此z的长度应该为64…

大数据开发之离线数仓项目(3数仓数据同步策略)(可面试使用)

第 1 章&#xff1a;实时数仓同步数据 实时数仓由flink源源不断从kafka当中读数据计算&#xff0c;所以不需要手动同步数据到实时数仓。 第 2 章&#xff1a;离线数仓同步数据 2.1 用户行为数据同步 2.1.1 数据通道 用户行为数据由flume从kafka直接同步到hdfs&#xff0c;…

通俗易懂理解通道注意力机制(CAM)与空间注意力机制(SAM)

重要说明&#xff1a;本文从网上资料整理而来&#xff0c;仅记录博主学习相关知识点的过程&#xff0c;侵删。 一、参考资料 通道注意力&#xff0c;空间注意力&#xff0c;像素注意力 通道注意力机制和空间注意力机制 视觉 注意力机制——通道注意力、空间注意力、自注意力…

Linux:进程信号的概念与产生原理

文章目录 信号的概念实践信号关于前台和后台进程的操作 操作系统与外设信号的产生signal系统调用 前面的篇章结束了信号量的话题&#xff0c;那么接下来引入的是信号的话题&#xff0c;信号和信号量之间没有任何关系&#xff0c;只是名字比较像 信号的概念 在生活中存在各种各…

Java学习day24:线程的同步和锁(例题+知识点详解)

声明&#xff1a;该专栏本人重新过一遍java知识点时候的笔记汇总&#xff0c;主要是每天的知识点题解&#xff0c;算是让自己巩固复习&#xff0c;也希望能给初学的朋友们一点帮助&#xff0c;大佬们不喜勿喷(抱拳了老铁&#xff01;) 往期回顾 Java学习day23&#xff1a;线程构…

Matlab图像模拟加噪——高斯噪声、椒盐噪声、泊松噪声、乘性噪声、均匀噪声、指数噪声

1.高斯噪声 (1)通过均值和方差来产生 Jimnoise(I, gaussian, 0, 0.01);%高斯噪声&#xff0c;均值为0&#xff0c;方差为0.01(2)通过位置信息来产生 Iim2double(I); Vzeros(size(I)); %建立矩阵V for i1:size(V, 1)V(i,:)0.02*i/size(V,1); end Jimnoise(I, localvar, V); …

Linux安装aria2出现No package aria2 available.的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

牛客,OR36 链表的回文结构,快慢指针和反转链表的实践

链表的回文结构_牛客题霸_牛客网 (nowcoder.com) 还是比较简单的&#xff0c;主要分为三个步骤&#xff0c;两种需掌握的函数实现 目录 主要思路过程&#xff0c;1&#xff0c;找到中间结点&#xff0c;2&#xff0c;反转中间结点往后的结点&#xff0c;3&#xff0c;遍历比…