【源码分析】zeebe actor模型源码解读

zeebe actor 模型🙋‍♂️

如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。

环境⛅

zeebe release-8.1.14

actor.run() 是怎么开始的🌈

zeebe actor 模型

LongPollingActivateJobsHandler.java

以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。

	private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() -> {final InFlightLongPollingActivateJobsRequestsState state =getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}

ActorControl

可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。

	final ActorTask task;@Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread = ActorThread.current();if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {final ActorJob newJob = currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job = new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}

job 是怎么被执行的⚡

并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。

Gateway.java

gateway 注册task

  private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future = new CompletableFuture<ActivateJobsHandler>();final var actor =Actor.newActor().name("ActivateJobsHandler").actorStartedHandler(handler.andThen(t -> future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}

ActorThreadGroup.java

就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup

	protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName = groupName;this.numOfThreads = numOfThreads;tasks = new WorkStealingGroup(numOfThreads);threads = new ActorThread[numOfThreads];for (int t = 0; t < numOfThreads; t++) {final String threadName = String.format("%s-%d", groupName, t);final ActorThread thread =builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] = thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}

ActorThread.java

ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task

// 继承自Thread @Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");}}// 主要执行doWork 方法@Overridepublic void run() {idleStrategy.init();while (state == ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);}}state = ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask = taskScheduler.getNextTask();if (currentTask != null) {final var actorName = currentTask.actor.getName();try (final var timer = actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties = currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit = false;try {// 真正执行当前任务resubmit = currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error("Unexpected error occurred in task {}", currentTask, e);} finally {MDC.remove("actor-name");clock.update();}if (resubmit) {currentTask.resubmit();}}

ActorTask.java

ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!

找到job 后开始执行当前job

public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit = false;// 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()while (!resubmit && (currentJob != null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED -> {final ActorJob terminatedJob = currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob = fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription = terminatedJob.getSubscription();// 如果订阅是一次性的,那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED ->// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit = true;default -> {}}if (shouldYield) {shouldYield = false;resubmit = currentJob != null;break;}}if (currentJob == null) {resubmit = onAllJobsDone();}return resubmit;}private boolean poll() {boolean result = false;result |= pollSubmittedJobs();result |= pollSubscriptions();return result;}

ActorJob.java

ActorJob 的执行逻辑

还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束

	void execute(final ActorThread runner) {actorThread = runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture != null) {resultFuture.complete(invocationResult);resultFuture = null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread = null;// 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable == null) {schedulingState = TaskSchedulingState.TERMINATED;} else {schedulingState = TaskSchedulingState.QUEUED;scheduledAt = System.nanoTime();}}}private void invoke() throws Exception {if (callable != null) {invocationResult = callable.call();} else {// only tasks triggered by a subscription can "yield"; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r = runnable;runnable = null;r.run();} else {// runnable 真正执行runnable.run();}}}

总结📝

本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~

zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/172320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python】Python将100个PDF文件对应的json文件存储到MySql数据库(源码)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

那些年,关于CKACKS认证的那些事儿?

前言 遥想2020年的年初&#xff0c;疫情封城封村之际&#xff0c;工作之余在B站将尚硅谷的linux中的k8s视频完整系统的学习了一遍&#xff0c;自此像是打通了任督二脉一般&#xff0c;开启了对k8s的探索之旅&#xff0c;一路也是磕磕绊绊的在工作中使用k8s。 终于在23年的6月仲…

【办公软件】电脑开机密码忘记了如何重置?

这个案例是家人的电脑&#xff0c;已经使用多年&#xff0c;又是有小孩操作过的&#xff0c;所以电脑密码根本不记得是什么了&#xff1f;那难道这台电脑就废了吗&#xff1f;需要重新装机吗&#xff1f;那里面的资料不是没有了&#xff1f; 为了解决以上问题&#xff0c;一般…

编程语言发展史:Python语言的兴起和特点

预计更新 第一部分&#xff1a;早期编程语言 1.1布尔代数和机器语言 1.2汇编语言的出现和发展 1.3高级语言的兴起 第二部分&#xff1a;主流编程语言 1.1 C语言的诞生及其影响 1.2 C语言的发展和应用 1.3 Java语言的出现和发展 1.4 Python语言的兴起和特点 1.5 JavaScript语言…

技术前沿探索:人工智能与大数据融合的未来

技术前沿探索&#xff1a;人工智能与大数据融合的未来 摘要&#xff1a;本博客将探讨人工智能与大数据融合领域的最新技术趋势、前沿研究方向以及挑战与机遇。通过介绍相关技术和案例&#xff0c;我们希望激发读者对这一领域的兴趣&#xff0c;并为其职业发展提供有益参考。 一…

万字解析设计模式之模板方法与解释器模式

一、模板方法模式 1.1概述 定义一个操作中算法的框架&#xff0c;而将一些步骤延迟到子类中&#xff0c;模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。 例如&#xff0c;去银行办理业务一般要经过以下4个流程&#xff1a;取号、排队、办理具体业…

ubuntu22.04安装swagboot遇到的问题

一、基本情况 系统&#xff1a;u 22.04 python&#xff1a; 3.10 二、问题描述 swagboot官方提供的安装路径言简意赅:python3 -m pip install --user snagboot 当然安装python3和pip是基本常识&#xff0c;这里就不再赘述。 可是在安装的时候出现如下提示说 Failed buildin…

qt pdf 模块简介

文章目录 1. 技术平台2. Qt pdf 模块3. cmake 使用模块4. 许可证5. 简单示例5.1 CMakeLists.txt5.2 main.cpp 6. 总结 1. 技术平台 项目说明OSwin10 x64Qt6.6compilermsvc2022构建工具cmake 2. Qt pdf 模块 Qt PDF模块包含用于呈现PDF文档的类和函数。 QPdfDocument 类加载P…

监控同一局域网内其它主机上网访问信息

1.先取得网关IP 2.安装IPTABLES路由表 sudo apt-get install iptables 3.启用IP转发 sudo sysctl -p 查看配置是否生效 4.配置路由 iptables -t nat -A POSTROUTING -j MASQUERADE 配置成功后,使用sudo iptables-save查看

[leetCode]257. 二叉树的所有路径(两种方法)

257. 二叉树的所有路径 题目描述&#xff1a; 给你一个二叉树的根节点 root &#xff0c;按 任意顺序 &#xff0c;返回所有从根节点到叶子节点的路径。 叶子节点 是指没有子节点的节点。 示例&#xff1a; 输入&#xff1a;root [1,2,3,null,5]输出&#xff1a;["1-&g…

每日OJ题_算法_双指针⑦力扣15. 三数之和

目录 力扣15. 三数之和 解析代码 力扣15. 三数之和 难度 中等 给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &#xff0c;同时还满足 nums[i] nums[j] nums[k] 0 。请 你返回所有和为 0 且不重复的三…

【Spring】Spring事务失效问题

&#x1f4eb;作者简介&#xff1a;小明java问道之路&#xff0c;2022年度博客之星全国TOP3&#xff0c;专注于后端、中间件、计算机底层、架构设计演进与稳定性建设优化&#xff0c;文章内容兼具广度、深度、大厂技术方案&#xff0c;对待技术喜欢推理加验证&#xff0c;就职于…

C/C++---------------LeetCode第229. 多数元素 II

多数元素|| 题目及要求哈希算法 题目及要求 给定一个大小为 n 的整数数组&#xff0c;找出其中所有出现超过 ⌊ n/3 ⌋ 次的元素。 示例 1&#xff1a; 输入&#xff1a;nums [3,2,3] 输出&#xff1a;[3] 示例 2&#xff1a; 输入&#xff1a;nums [1] 输出&#xff1a;…

基于uniapp+vue微信小程序的健康饮食管理系统 907m6

设计这个微信小程序系统能使用户实现不需出门就可以在手机或电脑前进行网上查询美食信息、 运动视频等功能。 本系统由用户和管理员两大模块组成。用户界面显示在应用程序中&#xff0c;管理员界面显示在后台服务中&#xff0c;通过小程序端与服务端间进行数据交互与数据传输实…

自建CA实战之 《0x03 代码签名》

自建CA实战之 《0x03 代码签名》 本文针对Windows平台&#xff0c;介绍如何使用自建CA来签发代码签名证书。 之前的文章中&#xff0c;我们介绍了如何自建CA&#xff0c;以及如何使用自建CA来签发Web服务器证书、客户端证书。 本文将介绍如何使用自建CA来签发代码签名证书。…

MYSQL中DML、DDL常用语句记录

MYSQL中DML、DDL常用语句记录 DML 在 MySQL 中&#xff0c;DML (Data Manipulation Language) 是一类用于查询和操作数据的 SQL 语句。以下是常用的 DML 语句&#xff1a; 1、SELECT SELECT 语句用于查询数据库中的数据。语法如下&#xff1a; SELECT column1, column2, .…

Kafka(一)在WSL单机搭建Kafka伪集群

目录 1 运行Kafka单实例1.1 Windws1.1.1 安装包下载1.1.2 修改环境变量1.1.3 修改配置文件1.1.4 启动Kafka单机版 1.2 Linux1.2.1 安装包下载1.2.2 创建目录1.2.3 添加环境变量1.2.4 修改配置文件1.2.5 运行Kafka1.2.6 停止Kafka 2 搭建Kafka集群2.1 搭建Zookeeper集群2.2 搭建…

文本转语音:微软语音合成标记语言 (SSML) 文本结构和事件

​ SSML 的语音服务实现基于万维网联合会的语音合成标记语言版本 1.0。 ​ 语音服务支持的元素可能与 W3C 标准不同。 每个 SSML 文档是使用 SSML 元素&#xff08;或标记&#xff09;创建的。 这些元素用于调整语音、风格、音节、韵律、音量等。 下面是 SSML 文档的基本结构…

【Linux常用命令】-文件写入相关

一、rm命令&#xff0c;文件删除 1.相关参数 -f&#xff08;–force&#xff09;&#xff1a;强制删除文件或目录&#xff0c;无需确认。 -r&#xff08;–recursive&#xff09;&#xff1a;递归地删除目录及其内容。 -i&#xff08;–interactive&#xff09;&#xff1a;交…

ultrascale FPGA

1.工艺从mos到FIN,查了半天资料&#xff0c;不如bili的intel介绍视频&#xff0c;其实是把DS做成3D结构&#xff0c;减小DS漏电流&#xff1b; 2.型号的尾数是以百万门为标定的&#xff1b; 3.slice&#xff08;切片&#xff09;是CLB的组成单元&#xff0c;slice又包含LUT&a…