百日筑基第十六天-java多线程编程浅学一下4-各种线程池学习和使用

百日筑基第十六天-java多线程编程浅学一下4-各种线程池学习和使用

使用线程池

Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。

如果可以复用一组线程:

┌─────┐ execute  ┌──────────────────┐
│Task1│─────────>│ThreadPool        │
├─────┤          │┌───────┐┌───────┐│
│Task2│          ││Thread1││Thread2││
├─────┤          │└───────┘└───────┘│
│Task3│          │┌───────┐┌───────┐│
├─────┤          ││Thread3││Thread4││
│Task4│          │└───────┘└───────┘│
├─────┤          └──────────────────┘
│Task5│
├─────┤
│Task6│
└─────┘...

那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。

简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);

因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:

  • FixedThreadPool:线程数固定的线程池;
  • CachedThreadPool:线程数根据任务动态调整的线程池;
  • SingleThreadExecutor:仅单线程执行的线程池。

创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:

import java.util.concurrent.*;public class Main {public static void main(String[] args) {// 创建一个固定大小的线程池:ExecutorService es = Executors.newFixedThreadPool(4);for (int i = 0; i < 6; i++) {es.submit(new Task("" + i));}// 关闭线程池:es.shutdown();}
}class Task implements Runnable {private final String name;public Task(String name) {this.name = name;}@Overridepublic void run() {System.out.println("start task " + name);try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println("end task " + name);}
}

我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。

线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。

如果我们把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。

如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

因此,想创建指定动态范围的线程池,可以这么写:

int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

ScheduledThreadPool

还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。

创建一个ScheduledThreadPool仍然是通过Executors类:

ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

我们可以提交一次性任务,它会在指定延迟后只执行一次:

// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

如果任务以固定的每3秒执行,我们可以这样写:

// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

如果任务以固定的3秒为间隔执行,我们可以这样写:

// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:

│░░░░   │░░░░░░ │░░░    │░░░░░  │░░░  
├───────┼───────┼───────┼───────┼────>
│<─────>│<─────>│<─────>│<─────>│

而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:

│░░░│       │░░░░░│       │░░│       │░
└───┼───────┼─────┼───────┼──┼───────┼──>│<─────>│     │<─────>│  │<─────>│

因此,使用ScheduledThreadPool时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。

细心的童鞋还可以思考下面的问题:

  • 在FixedRate模式下,假设每秒触发,如果某次任务执行时间超过1秒,后续任务会不会并发执行?
  • 如果任务抛出了异常,后续任务是否继续执行?

Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer

小结

JDK提供了ExecutorService实现了线程池功能:

  • 线程池内部维护一组线程,可以高效执行大量小任务;
  • Executors提供了静态方法创建不同类型的ExecutorService
  • 必须调用shutdown()关闭ExecutorService
  • ScheduledThreadPool可以定期调度多个任务。

使用Future

在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable接口,就可以让线程池去执行:

class Task implements Runnable {public String result;public void run() {this.result = longTimeCalculation(); }
}

Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:

class Task implements Callable<String> {public String call() throws Exception {return longTimeCalculation(); }
}

并且Callable接口是一个泛型接口,可以返回指定类型的结果。

现在的问题是,如何获得异步执行的结果?

如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞

当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果

一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

小结

对线程池提交一个Callable任务,可以获得一个Future对象;

可以用Future在将来某个时刻获取结果。

使用CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

我们以获取股票价格为例,看看如何使用CompletableFuture

public class Main {public static void main(String[] args) throws Exception {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);// 如果执行成功:cf.thenAccept((result) -> {System.out.println("price: " + result);});// 如果执行异常:cf.exceptionally((e) -> {e.printStackTrace();return null;});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(200);}static Double fetchPrice() {try {Thread.sleep(100);} catch (InterruptedException e) {}if (Math.random() < 0.3) {throw new RuntimeException("fetch price failed!");}return 5 + Math.random() * 20;}
}

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:

public interface Supplier<T> {T get();
}

这里我们用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。

紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:

public interface Consumer<T> {void accept(T t);
}

异常时,CompletableFuture会调用Function对象:

public interface Function<T, R> {R apply(T t);
}

这里我们都用lambda语法简化了代码。

可见CompletableFuture的优点是:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行。

如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

public class Main {public static void main(String[] args) throws Exception {// 第一个任务:CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油");});// cfQuery成功后继续执行下一个任务:CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {return fetchPrice(code);});// cfFetch成功后打印结果:cfFetch.thenAccept((result) -> {System.out.println("price: " + result);});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(2000);}static String queryCode(String name) {try {Thread.sleep(100);} catch (InterruptedException e) {}return "601857";}static Double fetchPrice(String code) {try {Thread.sleep(100);} catch (InterruptedException e) {}return 5 + Math.random() * 20;}
}

同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

public class Main {public static void main(String[] args) throws Exception {// 两个CompletableFuture执行异步查询:CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油", "https://finance.sina.com.cn/code/");});CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油", "https://money.163.com/code/");});// 用anyOf合并为一个新的CompletableFuture:CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);// 两个CompletableFuture执行异步查询:CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {return fetchPrice((String) code, "https://finance.sina.com.cn/price/");});CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {return fetchPrice((String) code, "https://money.163.com/price/");});// 用anyOf合并为一个新的CompletableFuture:CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);// 最终结果:cfFetch.thenAccept((result) -> {System.out.println("price: " + result);});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(200);}static String queryCode(String name, String url) {System.out.println("query code from " + url + "...");try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {}return "601857";}static Double fetchPrice(String code, String url) {System.out.println("query price from " + url + "...");try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {}return 5 + Math.random() * 20;}
}

上述逻辑实现的异步查询规则实际上是:

┌─────────────┐ ┌─────────────┐
│ Query Code  │ │ Query Code  │
│  from sina  │ │  from 163   │
└─────────────┘ └─────────────┘│               │└───────┬───────┘▼┌─────────────┐│    anyOf    │└─────────────┘│┌───────┴────────┐▼                ▼
┌─────────────┐  ┌─────────────┐
│ Query Price │  │ Query Price │
│  from sina  │  │  from 163   │
└─────────────┘  └─────────────┘│                │└────────┬───────┘▼┌─────────────┐│    anyOf    │└─────────────┘│▼┌─────────────┐│Display Price│└─────────────┘

除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

最后我们注意CompletableFuture的命名规则:

  • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示将异步在线程池中执行。

小结

CompletableFuture可以指定异步处理流程:

  • thenAccept()处理正常结果;
  • exceptional()处理异常结果;
  • thenApplyAsync()用于串行化另一个CompletableFuture
  • anyOf()allOf()用于并行化多个CompletableFuture

使用ForkJoin

Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。

我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:

┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘

这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。

我们来看如何使用Fork/Join对大数据进行并行求和:

public class Main {public static void main(String[] args) throws Exception {// 创建2000个随机数组成的数组:long[] array = new long[2000];long expectedSum = 0;for (int i = 0; i < array.length; i++) {array[i] = random();expectedSum += array[i];}System.out.println("Expected sum: " + expectedSum);// fork/join:ForkJoinTask<Long> task = new SumTask(array, 0, array.length);long startTime = System.currentTimeMillis();Long result = ForkJoinPool.commonPool().invoke(task);long endTime = System.currentTimeMillis();System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");}static Random random = new Random(0);static long random() {return random.nextInt(10000);}
}class SumTask extends RecursiveTask<Long> {static final int THRESHOLD = 500;long[] array;int start;int end;SumTask(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Long compute() {if (end - start <= THRESHOLD) {// 如果任务足够小,直接计算:long sum = 0;for (int i = start; i < end; i++) {sum += this.array[i];// 故意放慢计算速度:try {Thread.sleep(1);} catch (InterruptedException e) {}}return sum;}// 任务太大,一分为二:int middle = (end + start) / 2;System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));SumTask subtask1 = new SumTask(this.array, start, middle);SumTask subtask2 = new SumTask(this.array, middle, end);invokeAll(subtask1, subtask2);Long subresult1 = subtask1.join();Long subresult2 = subtask2.join();Long result = subresult1 + subresult2;System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);return result;}
}

观察上述代码的执行过程,一个大的计算任务02000首先分裂为两个小任务01000和10002000,这两个小任务仍然太大,继续分裂为更小的0500,5001000,10001500,1500~2000,最后,计算结果被依次合并,得到最终结果。

因此,核心代码SumTask继承自RecursiveTask,在compute()方法中,关键是如何“分裂”出子任务并且提交子任务:

class SumTask extends RecursiveTask<Long> {protected Long compute() {// “分裂”子任务:SumTask subtask1 = new SumTask(...);SumTask subtask2 = new SumTask(...);// invokeAll会并行运行两个子任务:invokeAll(subtask1, subtask2);// 获得子任务的结果:Long subresult1 = subtask1.join();Long subresult2 = subtask2.join();// 汇总结果:return subresult1 + subresult2;}
}

Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。

小结

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction

使用Fork/Join模式可以进行并行计算以提高效率。

使用ThreadLocal

多线程是Java实现多任务的基础,Thread对象代表一个线程,我们可以在代码中调用Thread.currentThread()获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字:

public class Main {public static void main(String[] args) throws Exception {log("start main...");new Thread(() -> {log("run task...");}).start();new Thread(() -> {log("print...");}).start();log("end main.");}static void log(String s) {System.out.println(Thread.currentThread().getName() + ": " + s);}
}

对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:

public void process(User user) {checkPermission();doWork();saveStatus();sendResponse();
}

然后,通过线程池去执行这些任务。

观察process()方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?

process()方法需要传递的状态就是User实例。有的童鞋会想,简单地传入User就可以了:

public void process(User user) {checkPermission(user);doWork(user);saveStatus(user);sendResponse(user);
}

但是往往一个方法又会调用其他很多方法,这样会导致User传递到所有地方:

void doWork(User user) {queryStatus(user);checkStatus();setNewStatus(user);log();
}

这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。

给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User对象就传不进去了。

Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象。

ThreadLocal实例通常总是以静态字段初始化如下:

static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();

它的典型使用方式如下:

void processUser(user) {try {threadLocalUser.set(user);step1();step2();} finally {threadLocalUser.remove();}
}

通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:

void step1() {User u = threadLocalUser.get();log();printUser();
}void log() {User u = threadLocalUser.get();println(u.name);
}void step2() {User u = threadLocalUser.get();checkUser(u.id);
}

注意到普通的方法调用一定是同一个线程执行的,所以,step1()step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例。

实际上,可以把ThreadLocal看成一个全局Map<Thread, Object>:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key:

Object threadLocalValue = threadLocalMap.get(Thread.currentThread());

因此,ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰。

最后,特别注意ThreadLocal一定要在finally中清除:

try {threadLocalUser.set(user);...
} finally {threadLocalUser.remove();
}

这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。

为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {...}结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal可以封装为一个UserContext对象:

public class UserContext implements AutoCloseable {static final ThreadLocal<String> ctx = new ThreadLocal<>();public UserContext(String user) {ctx.set(user);}public static String currentUser() {return ctx.get();}@Overridepublic void close() {ctx.remove();}
}

使用的时候,我们借助try (resource) {...}结构,可以这么写:

try (var ctx = new UserContext("Bob")) {// 可任意调用UserContext.currentUser():String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象

这样就在UserContext中完全封装了ThreadLocal,外部代码在try (resource) {...}内部可以随时调用UserContext.currentUser()获取当前线程绑定的用户名。

小结

ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;

ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);

使用ThreadLocal要用try ... finally结构,并在finally中清除。

使用虚拟线程

虚拟线程(Virtual Thread)是Java 19引入的一种轻量级线程,它在很多其他语言中被称为协程、纤程、绿色线程、用户态线程等。

在理解虚拟线程前,我们先回顾一下线程的特点:

  • 线程是由操作系统创建并调度的资源;
  • 线程切换会耗费大量CPU时间;
  • 一个系统能同时调度的线程数量是有限的,通常在几百至几千级别。

因此,我们说线程是一种重量级资源。在服务器端,对用户请求,通常都实现为一个线程处理一个请求。由于用户的请求数往往远超操作系统能同时调度的线程数量,所以通常使用线程池来尽量减少频繁创建和销毁线程的成本。

对于需要处理大量IO请求的任务来说,使用线程是低效的,因为一旦读写IO,线程就必须进入等待状态,直到IO数据返回。常见的IO操作包括:

  • 读写文件;
  • 读写网络,例如HTTP请求;
  • 读写数据库,本质上是通过JDBC实现网络调用。

我们举个例子,一个处理HTTP请求的线程,它在读写网络、文件的时候就会进入等待状态:

Begin
────────
Blocking ──▶ Read HTTP Request
Wait...
Wait...
Wait...
────────
Running
────────
Blocking ──▶ Read Config File
Wait...
────────
Running
────────
Blocking ──▶ Read Database
Wait...
Wait...
Wait...
────────
Running
────────
Blocking ──▶ Send HTTP Response
Wait...
Wait...
────────
End

真正由CPU执行的代码消耗的时间非常少,线程的大部分时间都在等待IO。我们把这类任务称为IO密集型任务。

为了能高效执行IO密集型任务,Java从19开始引入了虚拟线程。虚拟线程的接口和普通线程是一样的,但是执行方式不一样。虚拟线程不是由操作系统调度,而是由普通线程调度,即成百上千个虚拟线程可以由一个普通线程调度。任何时刻,只能执行一个虚拟线程,但是,一旦该虚拟线程执行一个IO操作进入等待时,它会被立刻“挂起”,然后执行下一个虚拟线程。什么时候IO数据返回了,这个挂起的虚拟线程才会被再次调度。因此,若干个虚拟线程可以在一个普通线程中交替运行:

Begin
───────────
V1 Runing
V1 Blocking ──▶ Read HTTP Request
───────────
V2 Runing
V2 Blocking ──▶ Read HTTP Request
───────────
V3 Runing
V3 Blocking ──▶ Read HTTP Request
───────────
V1 Runing
V1 Blocking ──▶ Read Config File
───────────
V2 Runing
V2 Blocking ──▶ Read Database
───────────
V1 Runing
V1 Blocking ──▶ Read Database
───────────
V3 Runing
V3 Blocking ──▶ Read Database
───────────
V2 Runing
V2 Blocking ──▶ Send HTTP Response
───────────
V1 Runing
V1 Blocking ──▶ Send HTTP Response
───────────
V3 Runing
V3 Blocking ──▶ Send HTTP Response
───────────
End

如果我们单独看一个虚拟线程的代码,在一个方法中:

void register() {config = readConfigFile("./config.json"); // #1if (config.useFullName) {name = req.firstName + " " + req.lastName;}insertInto(db, name); // #2if (config.cache) {redis.set(key, name); // #3}
}

涉及到IO读写的#1、#2、#3处,执行到这些地方的时候(进入相关的JNI方法内部时)会自动挂起,并切换到其他虚拟线程执行。等到数据返回后,当前虚拟线程会再次调度并执行,因此,代码看起来是同步执行,但实际上是异步执行的。

使用虚拟线程

虚拟线程的接口和普通线程一样,唯一区别在于创建虚拟线程只能通过特定方法。

方法一:直接创建虚拟线程并运行:

/ 传入Runnable实例并立刻运行:
Thread vt = Thread.startVirtualThread(() -> {System.out.println("Start virtual thread...");Thread.sleep(10);System.out.println("End virtual thread.");
});

方法二:创建虚拟线程但不自动运行,而是手动调用start()开始运行:

// 创建VirtualThread:
Thread.ofVirtual().unstarted(() -> {System.out.println("Start virtual thread...");Thread.sleep(1000);System.out.println("End virtual thread.");
});
// 运行:
vt.start();

方法三:通过虚拟线程的ThreadFactory创建虚拟线程,然后手动调用start()开始运行:

// 创建ThreadFactory:
ThreadFactory tf = Thread.ofVirtual().factory();
// 创建VirtualThread:
Thread vt = tf.newThread(() -> {System.out.println("Start virtual thread...");Thread.sleep(1000);System.out.println("End virtual thread.");
});
// 运行:
vt.start();

直接调用start()实际上是由ForkJoinPool的线程来调度的。我们也可以自己创建调度线程,然后运行虚拟线程:

// 创建调度器:
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
// 创建大量虚拟线程并调度:
ThreadFactory tf = Thread.ofVirtual().factory();
for (int i=0; i<100000; i++) {Thread vt = tf.newThread(() -> { ... });executor.submit(vt);// 也可以直接传入Runnable或Callable:executor.submit(() -> {System.out.println("Start virtual thread...");Thread.sleep(1000);System.out.println("End virtual thread.");return true;});
}

由于虚拟线程属于非常轻量级的资源,因此,用时创建,用完就扔,不要池化虚拟线程。

最后注意,虚拟线程在Java 21正式发布,在Java 19/20是预览功能,默认关闭,需要添加参数--enable-preview启用:

java --source 19 --enable-preview Main.java

使用限制

注意到只有以虚拟线程方式运行的代码,才会在执行IO操作时自动被挂起并切换到其他虚拟线程。普通线程的IO操作仍然会等待,例如,我们在main()方法中读写文件,是不会有调度和自动挂起的。

可以自动引发调度切换的操作包括:

  • 文件IO;
  • 网络IO;
  • 使用Concurrent库引发等待;
  • Thread.sleep()操作。

这是因为JDK为了实现虚拟线程,已经对底层相关操作进行了修改,这样应用层的Java代码无需修改即可使用虚拟线程。无法自动切换的语言需要用户手动调用await来实现异步操作:

async function doWork() {await readFile();await sendNetworkData();
}

在虚拟线程中,如果绕过JDK的IO接口,直接通过JNI读写文件或网络是无法实现调度的。此外,在synchronized块内部也无法调度。

小结

Java 19引入的虚拟线程是为了解决IO密集型任务的吞吐量,它可以高效通过少数线程去调度大量虚拟线程;

虚拟线程在执行到IO操作或Blocking操作时,会自动切换到其他虚拟线程执行,从而避免当前线程等待,能最大化线程的执行效率;

虚拟线程使用普通线程相同的接口,最大的好处是无需修改任何代码,就可以将现有的IO操作异步化获得更大的吞吐能力。

计算密集型任务不应使用虚拟线程,只能通过增加CPU核心解决,或者利用分布式计算资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/44160.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MQ四兄弟:如何保证消息顺序性

在当今的分布式系统架构中&#xff0c;消息队列&#xff08;MQ&#xff09;是不可或缺的组成部分。它们在确保系统组件之间高效通信方面发挥着关键作用。特别是在金融交易、物流跟踪等对消息处理顺序有严格要求的场景中&#xff0c;消息队列的顺序性保证显得更为重要。接下来&a…

使用libguestfs挂载qcow2磁盘镜像

挂载qcow2磁盘镜像的第一种方法是使用 libguestfs&#xff0c;它提供了一系列工具来访问和编辑 VM 磁盘镜像。libguestfs 支持几乎所有类型的磁盘镜像&#xff0c;包括 qcow2。你可以像下面这样&#xff0c;在Linux上安装libguestfs工具集。 1、安装guestmount工具 在基于 De…

主干网络篇 | YOLOv5/v7 更换骨干网络之 MobileNetV3 | 基于神经网络搜索的轻量级网络(2)

主干网络篇 | YOLOv5/v7 更换骨干网络之 MobileNetV3 | 基于神经网络搜索的轻量级网络 概述 YOLOv5和YOLOv7是目前主流的轻量级目标检测模型&#xff0c;在速度和精度方面取得了良好的平衡。然而&#xff0c;传统的YOLOv5/v7模型使用FPN和CSPNet等结构作为主干网络&#xff0…

SMU Summer 2024 Contest Round 2

[ABC357C] Sierpinski carpet - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路:通过因为图形的生成过程是完全一样的。可以通过递归&#xff0c;不断分形。函数process(x,y,k)定义为以坐标(x,y)为左上角,填充sqrt3(k)级的地毯。 int n; int c[800][800]; 默认全为…

【杂说咋说】近年来国土空间规划行业人员转行分析

这几年&#xff0c;国土空间规划行业的人员流动引起了不少关注。我们可以从几个方面来看这些变化&#xff1a; 考公务员 许多从事国土空间规划的专业人员选择了考公务员。这种选择相对稳定&#xff0c;不需要熬夜加班&#xff0c;工作环境也更为舒适。尤其是进入国家机关或住…

POSIX互斥锁和条件变量

一.概述 1.POXIS介绍 POXIS是一种操作系统接口标准&#xff0c;全称为“可移植操作系统接口”。 它最初由IEEE组织制定&#xff0c;目的是为了使不同的操作系统之间可以互相兼容。POSIX标准定义了一系列API&#xff08;应用程序接口&#xff09;和命令行工具&#xff0c;这些…

Mybatis核心问题总结

对MyBatis源码的理解 ORM框架&#xff1a;CRUD操作 1。SQL解析&#xff1a; 映射文件、注解--》映射器解析 XMLMapperBuilder MapperAnnotationBuilder 2。SQL执行: SqlSession 接口--》Executor --》 SimpleExecutor ReuseExecutor 【Statement--JDBC】 3。结果映射&…

Go语言---Json

JSON (JavaScript Object Notation)是一种比XML 更轻量级的数据交换格式&#xff0c;在易于人们阅读和编写的同时&#xff0c;也易于程序解析和生成。尽管JSON是 JavaScript的一个子集&#xff0c;但 JSON采用完全独立于编程语言的文本格式&#xff0c;且表现为键/值对集合的文…

【大模型LLM面试合集】大语言模型架构_layer_normalization

2.layer_normalization 1.Normalization 1.1 Batch Norm 为什么要进行BN呢&#xff1f; 在深度神经网络训练的过程中&#xff0c;通常以输入网络的每一个mini-batch进行训练&#xff0c;这样每个batch具有不同的分布&#xff0c;使模型训练起来特别困难。Internal Covariat…

【C++高阶】高效数据存储:理解并模拟实现红黑树Map与Set

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;了解 红黑树 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀模拟实现Map与Set &#x1f4d2;1.…

js ES6 part1

听了介绍感觉就是把js在oop的使用 作用域 作用域&#xff08;scope&#xff09;规定了变量能够被访问的“范围”&#xff0c;离开了这个“范围”变量便不能被访问&#xff0c; 作用域分为&#xff1a; 局部作用域、 全局作用域 1. 函数作用域&#xff1a; 在函数内部声明的…

爬取天气数据,利用Pyecharts作轮播图

爬取网站链接&#xff1a;https://lishi.tianqi.com/xiamen/202312.html 爬取了厦门市2023年一整年的天气数据&#xff0c;包括最高温&#xff0c;最低温&#xff0c;天气&#xff0c;风力风向等 爬虫代码&#xff1a; import requests import pandas as pd import csv from…

UML建模案例分析-时序图和类图的对应关系

概念 简单地说&#xff0c;类图定义了系统中的对象&#xff0c;时序图定义了对象之间的交互。 例子 一个电子商务系统&#xff0c;会员可通过电子商务系统购买零件。具体功能需求如下&#xff1a; 会员请求结账时&#xff0c;系统验证会员的账户是否处于登录状态&#xff1…

极狐GitLab 17.0 重磅发布,100+ DevSecOps功能更新来啦~【三】

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab &#xff1a;https://gitlab.cn/install?channelcontent&utm_sourcecsdn 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署…

【基础篇】1.8 C语言基础(二)

2.9 预处理指令和宏定义 在STM32开发中,预处理和宏定义常用于配置硬件参数、启用或禁用特定功能、以及优化代码以适应不同的硬件配置或应用场景。通过合理地使用预处理和宏定义,我们可以编写更加灵活、可配置和高效的代码。 预处理指令如#include、#define等在C语言编程中起…

防火墙图形化界面策略和用户认证(华为)

目录 策略概要认证概要实验拓扑图题目要求一要求二要求三要求四要求五要求六 策略概要 安全策略概要&#xff1a; 安全策略&#xff08;Security Policy&#xff09;在安全领域具有双重含义。宏观上&#xff0c;安全策略指的是一个组织为保证其信息安全而建立的一套安全需求、…

uniapp 微信小程序接入MQTT

MQTT安装 前期准备 由于微信小程序需要wss&#xff0c;所以要有域名SSL证书 新建目录/srv/mosquitto/config&#xff0c;/srv/mosquitto/config/cert 目录/srv/mosquitto/config中新建配置文件mosquitto.conf&#xff0c;文件内容 persistence true persistence_location /m…

深入探索Apache Flink:流处理的艺术与实践

在当今的大数据时代&#xff0c;流处理已成为处理实时数据的关键技术。Apache Flink&#xff0c;作为一个开源的流处理框架&#xff0c;以其高吞吐量、低延迟和精确一次&#xff08;exactly-once&#xff09;的语义处理能力&#xff0c;在众多流处理框架中脱颖而出。本文将深入…

在树莓派设备上导出系统镜像

镜像导出 前提条件&#xff1a; 已获取可以正常使用的设备。已获取鼠标、键盘和电源适配器。已将设备接入可正常使用的网络。 操作步骤&#xff1a; 连接适配器给设备上电&#xff0c;正常启动设备&#xff0c;连接鼠标和键盘。在终端命令窗格执行如下命令&#xff0c;安装…

数据模型-ER图在数据模型设计中的应用

ER图在数据模型设计中的应用 1. ER图概述&#xff1a;起源与发展​ 实体-关系图&#xff08;Entity Relationship Diagram&#xff0c;简称ER图&#xff09;起源于1970年代&#xff0c;由Peter Chen首次提出&#xff0c;作为描述数据和信息间关系的图形化语言。随着数据库技术…