【源码分析】zeebe actor模型源码解读

zeebe actor 模型🙋‍♂️

如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。

环境⛅

zeebe release-8.1.14

actor.run() 是怎么开始的🌈

zeebe actor 模型

LongPollingActivateJobsHandler.java

以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。

	private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() -> {final InFlightLongPollingActivateJobsRequestsState state =getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}

ActorControl

可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。

	final ActorTask task;@Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread = ActorThread.current();if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {final ActorJob newJob = currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job = new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}

job 是怎么被执行的⚡

并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。

Gateway.java

gateway 注册task

  private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future = new CompletableFuture<ActivateJobsHandler>();final var actor =Actor.newActor().name("ActivateJobsHandler").actorStartedHandler(handler.andThen(t -> future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}

ActorThreadGroup.java

就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup

	protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName = groupName;this.numOfThreads = numOfThreads;tasks = new WorkStealingGroup(numOfThreads);threads = new ActorThread[numOfThreads];for (int t = 0; t < numOfThreads; t++) {final String threadName = String.format("%s-%d", groupName, t);final ActorThread thread =builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] = thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}

ActorThread.java

ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task

// 继承自Thread @Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");}}// 主要执行doWork 方法@Overridepublic void run() {idleStrategy.init();while (state == ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);}}state = ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask = taskScheduler.getNextTask();if (currentTask != null) {final var actorName = currentTask.actor.getName();try (final var timer = actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties = currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit = false;try {// 真正执行当前任务resubmit = currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error("Unexpected error occurred in task {}", currentTask, e);} finally {MDC.remove("actor-name");clock.update();}if (resubmit) {currentTask.resubmit();}}

ActorTask.java

ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!

找到job 后开始执行当前job

public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit = false;// 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()while (!resubmit && (currentJob != null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED -> {final ActorJob terminatedJob = currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob = fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription = terminatedJob.getSubscription();// 如果订阅是一次性的,那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED ->// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit = true;default -> {}}if (shouldYield) {shouldYield = false;resubmit = currentJob != null;break;}}if (currentJob == null) {resubmit = onAllJobsDone();}return resubmit;}private boolean poll() {boolean result = false;result |= pollSubmittedJobs();result |= pollSubscriptions();return result;}

ActorJob.java

ActorJob 的执行逻辑

还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束

	void execute(final ActorThread runner) {actorThread = runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture != null) {resultFuture.complete(invocationResult);resultFuture = null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread = null;// 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable == null) {schedulingState = TaskSchedulingState.TERMINATED;} else {schedulingState = TaskSchedulingState.QUEUED;scheduledAt = System.nanoTime();}}}private void invoke() throws Exception {if (callable != null) {invocationResult = callable.call();} else {// only tasks triggered by a subscription can "yield"; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r = runnable;runnable = null;r.run();} else {// runnable 真正执行runnable.run();}}}

总结📝

本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~

zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/172320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python】Python将100个PDF文件对应的json文件存储到MySql数据库(源码)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

那些年,关于CKACKS认证的那些事儿?

前言 遥想2020年的年初&#xff0c;疫情封城封村之际&#xff0c;工作之余在B站将尚硅谷的linux中的k8s视频完整系统的学习了一遍&#xff0c;自此像是打通了任督二脉一般&#xff0c;开启了对k8s的探索之旅&#xff0c;一路也是磕磕绊绊的在工作中使用k8s。 终于在23年的6月仲…

【办公软件】电脑开机密码忘记了如何重置?

这个案例是家人的电脑&#xff0c;已经使用多年&#xff0c;又是有小孩操作过的&#xff0c;所以电脑密码根本不记得是什么了&#xff1f;那难道这台电脑就废了吗&#xff1f;需要重新装机吗&#xff1f;那里面的资料不是没有了&#xff1f; 为了解决以上问题&#xff0c;一般…

技术前沿探索:人工智能与大数据融合的未来

技术前沿探索&#xff1a;人工智能与大数据融合的未来 摘要&#xff1a;本博客将探讨人工智能与大数据融合领域的最新技术趋势、前沿研究方向以及挑战与机遇。通过介绍相关技术和案例&#xff0c;我们希望激发读者对这一领域的兴趣&#xff0c;并为其职业发展提供有益参考。 一…

万字解析设计模式之模板方法与解释器模式

一、模板方法模式 1.1概述 定义一个操作中算法的框架&#xff0c;而将一些步骤延迟到子类中&#xff0c;模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。 例如&#xff0c;去银行办理业务一般要经过以下4个流程&#xff1a;取号、排队、办理具体业…

qt pdf 模块简介

文章目录 1. 技术平台2. Qt pdf 模块3. cmake 使用模块4. 许可证5. 简单示例5.1 CMakeLists.txt5.2 main.cpp 6. 总结 1. 技术平台 项目说明OSwin10 x64Qt6.6compilermsvc2022构建工具cmake 2. Qt pdf 模块 Qt PDF模块包含用于呈现PDF文档的类和函数。 QPdfDocument 类加载P…

监控同一局域网内其它主机上网访问信息

1.先取得网关IP 2.安装IPTABLES路由表 sudo apt-get install iptables 3.启用IP转发 sudo sysctl -p 查看配置是否生效 4.配置路由 iptables -t nat -A POSTROUTING -j MASQUERADE 配置成功后,使用sudo iptables-save查看

[leetCode]257. 二叉树的所有路径(两种方法)

257. 二叉树的所有路径 题目描述&#xff1a; 给你一个二叉树的根节点 root &#xff0c;按 任意顺序 &#xff0c;返回所有从根节点到叶子节点的路径。 叶子节点 是指没有子节点的节点。 示例&#xff1a; 输入&#xff1a;root [1,2,3,null,5]输出&#xff1a;["1-&g…

【Spring】Spring事务失效问题

&#x1f4eb;作者简介&#xff1a;小明java问道之路&#xff0c;2022年度博客之星全国TOP3&#xff0c;专注于后端、中间件、计算机底层、架构设计演进与稳定性建设优化&#xff0c;文章内容兼具广度、深度、大厂技术方案&#xff0c;对待技术喜欢推理加验证&#xff0c;就职于…

基于uniapp+vue微信小程序的健康饮食管理系统 907m6

设计这个微信小程序系统能使用户实现不需出门就可以在手机或电脑前进行网上查询美食信息、 运动视频等功能。 本系统由用户和管理员两大模块组成。用户界面显示在应用程序中&#xff0c;管理员界面显示在后台服务中&#xff0c;通过小程序端与服务端间进行数据交互与数据传输实…

自建CA实战之 《0x03 代码签名》

自建CA实战之 《0x03 代码签名》 本文针对Windows平台&#xff0c;介绍如何使用自建CA来签发代码签名证书。 之前的文章中&#xff0c;我们介绍了如何自建CA&#xff0c;以及如何使用自建CA来签发Web服务器证书、客户端证书。 本文将介绍如何使用自建CA来签发代码签名证书。…

文本转语音:微软语音合成标记语言 (SSML) 文本结构和事件

​ SSML 的语音服务实现基于万维网联合会的语音合成标记语言版本 1.0。 ​ 语音服务支持的元素可能与 W3C 标准不同。 每个 SSML 文档是使用 SSML 元素&#xff08;或标记&#xff09;创建的。 这些元素用于调整语音、风格、音节、韵律、音量等。 下面是 SSML 文档的基本结构…

CANdelaStudio 使用教程5 编辑DID

文章目录 在哪编辑DID的分类编辑快照数据添加 DID 在哪编辑 DID的分类 编辑快照数据 添加 DID

async函数和await关键字

async写在一个函数a前面&#xff0c;该函数变为异步函数&#xff0c;可在里面使用await关键字&#xff0c;await后面一般跟一个promise对象&#xff08;axios函数返回一个promise对象&#xff0c;里面有异步任务&#xff09;&#xff0c;await会原地等待该异步任务结果&#xf…

单细胞seurat入门—— 从原始数据到表达矩阵

根据所使用的建库方法&#xff0c;单细胞的RNA序列&#xff08;也称为读取&#xff08;reads&#xff09;或标签&#xff08;tags&#xff09;&#xff09;将从转录本的3端&#xff08;或5端&#xff09;&#xff08;10X Genomics&#xff0c;CEL-seq2&#xff0c;Drop-seq&…

枚举的第一行

2023年11月26日 问题: 好奇enum的所声明的枚举类的第一行是什么 从java技术卷1中第五章5.6中,了解是枚举类的实例 验证 错误信息: 解释: 此时只有有参构造 在这个枚举类里不能使用空,大概意思是说不能使用空参创建实例 校验 在原有的基础上创建一个无参构造 结果:不再报错,第…

【教学类-06-13】20231126 (55格版)趣味题(一)1-9加法题(10倍)(整十相加)

作品展示 背景需求&#xff1a; 1、会做加法题的孩子5分钟内完成题目&#xff0c;太快了&#xff0c;所以为了拉平差异&#xff0c;需要给这些会做另外的题目&#xff0c;比如提供一些他们没有做过的“趣味题形”。 2、好多次&#xff0c;听见大班孩子在互相“考试”——“老…

CSS常用笔记

1. 脱离文档流&#xff0c;用于微调 {position: relative; top: 10px; right: 0; } 2. flex布局大法 <div class"demo"><div class"demo-1"></div><div class"demo-2"></div><div class"demo-3"&…

从源码重新真正认识RateLimiter(SmoothBursty实现)

前言 相信大家对于谷歌RateLimiter一定并不陌生,在项目中应该也经常拿来进行限流&#xff0c;但是对于其实现原理并不一定能用熟于心&#xff0c;本文带大家从源码探究RateLimiter的设计与具体实现。 RateLimiter的组成 从源码可以看到&#xff0c;RateLimiter由stopwatch与m…

Elasticsearch集群部署,配置head监控插件

Elasticsearch是一个开源搜索引擎&#xff0c;基于Lucene搜索库构建&#xff0c;被广泛应用于全文搜索、地理位置搜索、日志处理、商业分析等领域。它采用分布式架构&#xff0c;可以处理大规模数据集和支持高并发访问。Elasticsearch提供了一个简单而强大的API&#xff0c;可以…