文章目录
- 一、引入
- 二、概念
- 1、什么是协程
- 2、轻量级怎么理解?为什么快?
- 3、用途
- 4、信息交换方式
- 5、协程和线程的比较
- 6、各语言对协程的支持
- 三、Kilim
- 1、是什么
- 2、原理
- 四、实践案例
- 1、打印文字案例
- 2、试试能创建多少个任务
- 1)多线程
- 2)协程
- 3、打车计价
- 1)多线程
- 2)协程
- 4、视频推荐算法
- 1)多线程
- 2)协程
- 五、总结
- 遇到问题:has either not been woven or the classpath is incorrect
一、引入
在传统处理中,每个请求占用一个线程处理完整业务逻辑,所以系统的吞吐量取决于每个线程的耗时操作,如果遇到很耗时的IO,那么整个系统的吞吐量巨低,因为线程大部分时间处理阻塞等待了,无法充分压榨CPU资源。在多核环境下,我们通过使用多线程,充分利用CPU资源,达到并行计算的目的,但会导致内存占用,并发锁随之带来的等待,如果执行耗时的IO怎么处理?
我们可以通过异步来消除等待,提高CPU利用率,Java领域的技术有Future、Calback、Sevlet3.0、Reactive等,主要通过回调函数或事件驱动来实现,那它有什么问题?
读写、debug、测试困难…
那么,还有其它方式?那就是可以通过协程来消除等待,写法还是原来的,只不过程序在等待IO时,已经帮我们把线程释放了,类似语法糖。
语法糖是指在编程语言中提供的一种简洁、易读的语法形式,它并不提供新的功能,只是为了让代码更加易于理解和书写。
二、概念
1、什么是协程
协程,是一种轻量级线程,它的调度完全由用户控制,也被称为用户级线程或纤程。它是一种并发编程的技术,允许在单个线程内实现多个执行流。从名字可以看出,是相互协作的理念,而不是像线程一样去争抢资源,可以在执行过程中暂停和恢复,而不会阻塞整个线程。因为它拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,等切换回来时,恢复原先保存的寄存器上下文和栈,直接操作栈没有内核切换的开销,速度很快。
2、轻量级怎么理解?为什么快?
-
线程切换,从一个线程切换到另一个线程时,通常涉及到从用户态切换到内核态,然后再切换回用户态。这是因为线程切换需要访问操作系统的调度器和线程管理数据结构,这些操作需要在内核态下执行。
-
协程切换与线程切换有所不同。协程切换是在用户态下进行的,不涉及内核态和用户态之间的切换。在协程中,切换是由协程自身控制的,而不是由操作系统的调度器控制。协程可以在执行过程中主动挂起自己,并将执行权转移到其他协程上。这种切换是协作式的,需要协程之间进行协作和协调。
总的来说,协程完全在用户态进行,不涉及内核态,减少了大开销。
3、用途
-
异步编程:协程可以在等待I/O操作时释放CPU资源,从而提高程序的并发性能。通过使用协程,可以编写更简洁、可读性更高的异步代码。
-
任务调度:协程可以用于实现任务调度器,可以按照优先级或其他规则调度执行不同的任务。这对于处理大量并发任务的应用程序非常有用。
-
事件驱动编程:协程可以用于处理事件驱动的编程模型,例如GUI应用程序或网络服务器。通过使用协程,可以编写响应式的代码,以便在事件发生时立即执行相应的操作。
-
生成器:协程可以用于实现生成器函数,这是一种可以暂停和恢复执行的函数。生成器函数可以用于惰性计算、无限序列生成等场景。
4、信息交换方式
-
共享内存
多个线程可以访问和修改同一块内存区域,通过读写该内存区域来进行信息交换,需要通过锁来保证线程安全。 -
消息传递
通过发送和接收消息来进行信息交换,可以使用队列、管道、信号量等机制来实现。例如CSP模型,Go语言中的goroutine和channel,以及Erlang语言中的进程和消息传递。
5、协程和线程的比较
- 调度所属:
线程由OS内核完成,协程由用户完成 - 切换开销:
线程涉及模式切换(用户态和内核态)、PC、SP、DX等共16个寄存器的刷新
协程只涉及PC(程序计数器)、SP(堆栈指针)、DX(通用寄存器)的值修改 - 并发性能:协程通常比线程更轻量级,可以在单个线程中同时运行多个协程。
- 内存占用:由于协程在单个线程中运行,所以它们通常比线程占用更少的内存。
- 数据同步:线程通常使用锁等机制来实现数据同步。而协程使用消息传递和异步等待来实现协程间的通信和协作,只需要判断状态即可。
注:为什么要加锁,线程由CPU控制,我们并不知道什么时候执行完,如果我们自己可以控制,就不需要加锁了
6、各语言对协程的支持
- Python:Python 语言通过 yield 和 send 的方式实现协程。在 Python 3.5 以后,async/await 成为了更好的替代方案。
- Go:Go语言通过goroutine和channel提供了对协程的支持。goroutine是一种轻量级的线程,可以在函数调用前加上go关键字来创建。channel用于协程之间的通信和同步。
- Java:Java 语言并没有对协程的原生支持,但是某些开源框架模拟出了协程的功能。例如,Kilim、Loom、Quasar框架。
三、Kilim
官方GitHub:https://github.com/kilim/kilim
1、是什么
Kilim是一个Java库,用于实现协程。
Kilim: Continuations, Fibers, Actors and message passing for the JVM
Kilim is composed of 2 primary components:
- The Kilim weaver modifies the bytecode of compiled java classes, enabling a method to save it’s state and yield control of it’s thread, ie to cooperatively multitask
- The Kilim runtime library provides constructs that leverage the weaver to simplify concurrent programming, including Coroutines, Task, Actors, Mailboxes (aka channels), and Generators
Together, these facilities allow for simple concurrency and can scale to millions of concurrent tasks
2、原理
它通过字节码转换技术来实现协程的原理。具体来说,Kilim会对Java字节码进行修改,将方法调用和返回的地方替换为协程切换的逻辑。这样,在运行时,当一个协程遇到阻塞操作时,它会暂停执行,并将控制权切换给其他协程。当阻塞操作完成后,控制权会再次切换回来,协程继续执行。这种方式实现了协程的非阻塞调度,提高了并发性能和资源利用率。
简单来说,就是通过Java字节码状态来实现暂停、继续执行。
四、实践案例
我们使用Kilim来实现协程,需要引入依赖,如下:
<dependency><groupId>org.db4j</groupId><artifactId>kilim</artifactId><version>2.0.2</version>
</dependency>
1、打印文字案例
public static void main(String[] args) {Task task = new Task() {@Overridepublic void execute() throws Pausable {System.out.print("我");yield();System.out.print("中");yield();System.out.print("!");}};task.run();System.out.print("爱");task.run();System.out.print("国");task.run();
}
输出结果:
我爱中国!
2、试试能创建多少个任务
1)多线程
public static void createJavaTask() {long current = System.nanoTime();for (int i = 1; i <= n; i++) {Thread t = new Thread();t.start();if (i % n == 0) {System.out.println("created " + i + " tasks cost " + (System.nanoTime() - current));}}
}
尝试创建50W个任务,耗时如下:
created 500000 same tasks cost 26026263500
想尝试创建100W个任务,发现阻塞很久还是没反应。。。
2)协程
public static void createKilimTask() {long current = System.nanoTime();for (int i = 1; i <= n; i++) {Task t = new ForlanTask();t.start();if (i % n == 0) {System.out.println("created " + i + " tasks cost " + (System.nanoTime() - current));}}
}class ForlanTask extends Task {@Overridepublic void execute() throws Pausable {}
}
尝试创建200W个任务,耗时如下:
created 2000000 tasks cost 2736375000
总的来说,协程创建快很多,正如官方所说,可以达到百万并发。
Together, these facilities allow for simple concurrency and can scale to millions of concurrent tasks
3、打车计价
主要分为距离和时间计价,然后汇总算最终的价格
距离和时间计价公共方法如下:
public static Integer getDistanceCost(Integer distance) {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new IllegalStateException(e);}return distance * 3;
}
public static Integer getTimeCost(Integer time) {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new IllegalStateException(e);}return time * 2;
}
1)多线程
public static Integer getCompletableFuture(Integer distance, Integer time) {CompletableFuture<Integer> distanceFuture = CompletableFuture.supplyAsync(() -> {return getDistanceCost(distance);});CompletableFuture<Integer> timeFuture = CompletableFuture.supplyAsync(() -> {return getTimeCost(time);});int res = 0;try {Integer res1 = distanceFuture.get();Integer res2 = timeFuture.get();res = res1 + res2;} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}return res;
}
测试代码:
long start = System.currentTimeMillis();
Integer completableFuture1 = getCompletableFuture(10, 20);
Integer completableFuture2 = getCompletableFuture(30, 50);
System.out.println(completableFuture1 + completableFuture2 + " 元, cost:" + (System.currentTimeMillis() - start));
输出结果:
260 元, cost:6425
2)协程
class CostTask extends Task.Spawn<Integer> {private Mailbox<Integer> distanceMailbox = new Mailbox<Integer>();private Mailbox<Integer> timeMailbox = new Mailbox<Integer>();private Integer distance;private Integer time;CostTask(Integer distance, Integer time) {this.distance = distance;this.time = time;timeMailbox.addMsgAvailableListener(new EventSubscriber() {@Overridepublic void onEvent(EventPublisher ep, Event e) {continueRun();}});}public void continueRun() {run();}@Overridepublic void execute() throws Pausable, Exception {new DistanceTask(distanceMailbox, distance).start();new TimeTask(timeMailbox, time).start();yield();exitResult = distanceMailbox.get() + timeMailbox.get();}
}
class DistanceTask extends Task {private Mailbox<Integer> mailbox;private Integer distance;public DistanceTask(Mailbox<Integer> mailbox, Integer distance) {this.mailbox = mailbox;this.distance = distance;}@Overridepublic void execute() throws Pausable, Exception {mailbox.put(getDistanceCost(distance));}
}
class TimeTask extends Task {private Mailbox<Integer> mailbox;private Integer time;public TimeTask(Mailbox<Integer> mailbox, Integer time) {this.mailbox = mailbox;this.time = time;}@Overridepublic void execute() throws Pausable, Exception {mailbox.put(getTimeCost(time));}
}
测试代码:
CostTask costTask1 = new CostTask(10, 20);
costTask1.run();
CostTask costTask2 = new CostTask(30, 50);
costTask2.run();
ExitMsg<Integer> join1 = costTask1.joinb();
ExitMsg<Integer> join2 = costTask2.joinb();
System.out.println(join1.result + join2.result + " 元, cost:" + (System.currentTimeMillis() - start));
输出结果:
260 元, cost:3295
4、视频推荐算法
简单视频推荐系统算法,挑选出最符合的结果
推荐因子:
1、本人观看历史记录的标签权重
2、当下最热视频标签权重
获取推荐视频工具类
public class RecommendationUtils {public static String getRecommendedVideos(HashMap<String, Double> usertag, HashMap<String, HashMap<String, Double>> movieFactor) {double maxScore = 0;String result = null;for (String movie : movieFactor.keySet()) {HashMap<String, Double> factor = movieFactor.get(movie);double score = 0;for (String tag : usertag.keySet()) {if (factor.containsKey(tag)) {score += factor.get(tag) * usertag.get(tag);}}if (score > maxScore) {maxScore = score;result = movie;}}return result;}public static HashMap<String, Double> getViewHistoryTag(String accountId) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}HashMap<String, Double> result = new HashMap<String, Double>();result.put("电影", 1.0);result.put("动漫", 0.8);result.put("搞笑", 0.9);return result;}public static HashMap<String, HashMap<String, Double>> getHottestMovie() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}HashMap<String, Double> hzw = new HashMap<String, Double>();hzw.put("动漫", 0.7);hzw.put("剧情", 0.6);HashMap<String, Double> xltfn = new HashMap<String, Double>();xltfn.put("搞笑", 0.7);xltfn.put("电影", 0.4);HashMap<String, HashMap<String, Double>> movieFactor = new HashMap<String, HashMap<String, Double>>();movieFactor.put("海贼王", hzw);movieFactor.put("夏洛特烦恼", xltfn);return movieFactor;}
}
工具类中一共3个方法,其中getViewHistoryTags是获取历史标签的权重,耗时1秒;getHottestMovie是获取当下热门视频标签的权重,耗时2秒;getRecommendedVideos是合并两种标签,计算出最终推荐视频;
1)多线程
public HashMap<String, String> getRecommendedVideos(String[] accountIds) {HashMap<String, String> result = new HashMap<String, String>();for (String account : accountIds) {result.put(account, getRecommendedVideos(account));}return result;
}
public String getRecommendedVideos(String accountId) {ExecutorService executor = Executors.newCachedThreadPool();Callable<HashMap<String, HashMap<String, Double>>> hotmovie = () -> {return RecommendationUtils.getHottestMovie();};FutureTask<HashMap<String, HashMap<String, Double>>> task1 = new FutureTask<HashMap<String, HashMap<String, Double>>>(hotmovie);log.info("submit task for hot movie data");executor.submit(task1);Callable<HashMap<String, Double>> history = () -> {return RecommendationUtils.getViewHistoryTag(accountId);};FutureTask<HashMap<String, Double>> task2 = new FutureTask<HashMap<String, Double>>(history);log.info("submit task for hot view history data");executor.submit(task2);try {log.info("calculate the result...");return RecommendationUtils.getRecommendedVideos(task2.get(), task1.get());} catch (InterruptedException e) {log.error(e.getMessage());} catch (ExecutionException e) {log.error(e.getMessage());}return null;
}
执行结果:
{1=夏洛特烦恼, 2=夏洛特烦恼, 3=夏洛特烦恼, 4=夏洛特烦恼, 5=夏洛特烦恼} task cost:10090
耗时10秒多,我们循坏查询了5个用户id,每个用户并行获取历史视频和热视频标签权重,取决于耗时最长的时间,所以是2秒,最终就是5*2=10秒
2)协程
public HashMap<String, String> getRecommendedVideos(String[] accountIds) {HashMap<String, String> result = new HashMap<String, String>();ArrayDeque<Recommended> ad = new ArrayDeque<Recommended>();for (String accountId : accountIds) {Recommended task = new Recommended(accountId);task.run();ad.push(task);}while (!ad.isEmpty()) {Recommended item = ad.poll();ExitMsg<String> msg = item.joinb();result.put(item.getAccountId(), msg.result);}return result;
}
class Recommended extends Task.Spawn<String> {private Mailbox<HashMap<String, Double>> historyBox = new Mailbox<HashMap<String, Double>>();private Mailbox<HashMap<String, HashMap<String, Double>>> hotMovieBox = new Mailbox<HashMap<String, HashMap<String, Double>>>();private String accountId;public String getAccountId() {return accountId;}Recommended(String accountId) {this.accountId = accountId;hotMovieBox.addMsgAvailableListener(new EventSubscriber() {@Overridepublic void onEvent(EventPublisher ep, Event e) {continueRun();}});}public void continueRun() {log.info("continue run");this.run();}@Overridepublic void execute() throws Pausable, Exception {new HistoryTask(accountId, historyBox).start();new HotMovieTask(hotMovieBox).start();log.info("rest for a while");yield();log.info("recommendation task begin");exitResult = RecommendationUtils.getRecommendedVideos(historyBox.get(), hotMovieBox.get());}
}
class HistoryTask extends Task.Spawn<HashMap<String, Double>> {private Mailbox<HashMap<String, Double>> mailbox;private String accountId;public HistoryTask(String accountId, Mailbox<HashMap<String, Double>> mailbox) {this.accountId = accountId;this.mailbox = mailbox;}@Overridepublic void execute() throws Pausable, Exception {mailbox.put(RecommendationUtils.getViewHistoryTag(accountId));}
}
class HotMovieTask extends Task {private Mailbox<HashMap<String, HashMap<String, Double>>> mailbox;public HotMovieTask(Mailbox<HashMap<String, HashMap<String, Double>>> mailbox) {this.mailbox = mailbox;}@Overridepublic void execute() throws Pausable, Exception {mailbox.put(RecommendationUtils.getHottestMovie());}
}
执行结果:
{1=夏洛特烦恼, 2=夏洛特烦恼, 3=夏洛特烦恼, 4=夏洛特烦恼, 5=夏洛特烦恼} task cost:2026
我们同样循坏查询了5个用户id,只不过每次查询,直接释放了线程,等拿到结果,再进行最终计算,最终取决于耗时最长的时间,所以是2秒
五、总结
协程由用户控制,占用内存小,开销小,高性能,优于多线程,能够创建更多的数量,执行更多的任务,可以达到百万并发,适合用于异步操作、并发任务、高吞吐量的场景,提供了更高效、简洁的编程模型,在Java中,可以使用Kilim框架来实现,主要是通过Java字节码状态来实现暂停、继续执行。
遇到问题:has either not been woven or the classpath is incorrect
详细错误如下:
############################################################
Task class coroutines.SimplePauseContine$1 has either not been woven or the classpath is incorrect
############################################################
java.lang.Exception: Stack traceat java.lang.Thread.dumpStack(Thread.java:1336)at kilim.Task.errNotWoven(Task.java:306)at kilim.Task.execute(Task.java:483)at kilim.Task.run(Task.java:550)at coroutines.SimplePauseContine.main(SimplePauseContine.java:20)
解决方式:在pom.xml中加入插件,然后重新打包一下:mvn package
<build><plugins><plugin><groupId>org.db4j</groupId><artifactId>kilim</artifactId><version>2.0.1</version><executions><execution><goals><goal>weave</goal></goals></execution></executions></plugin></plugins>
</build>