聊聊PowerJob的ServerDeployContainerRequest

本文主要研究一下PowerJob的ServerDeployContainerRequest

ServerDeployContainerRequest

tech/powerjob/common/request/ServerDeployContainerRequest.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerDeployContainerRequest implements PowerSerializable {/*** 容器ID*/private Long containerId;/*** 容器名称*/private String containerName;/*** 文件名(MD5值),用于做版本校验和文件下载*/private String version;/*** 下载地址*/private String downloadURL;
}

ServerDeployContainerRequest定义了containerId、containerName、version、downloadURL属性

onReceiveServerDeployContainerRequest

tech/powerjob/worker/actors/WorkerActor.java

    @Handler(path = WORKER_HANDLER_DEPLOY_CONTAINER)public void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {OmsContainerFactory.deployContainer(request);}

WorkerActor的onReceiveServerDeployContainerRequest用于处理ServerDeployContainerRequest,它委托给了OmsContainerFactory.deployContainer

deployContainer

tech/powerjob/worker/container/OmsContainerFactory.java

    public static synchronized void deployContainer(ServerDeployContainerRequest request) {Long containerId = request.getContainerId();String containerName = request.getContainerName();String version = request.getVersion();log.info("[OmsContainer-{}] start to deploy container(name={},version={},downloadUrl={})", containerId, containerName, version, request.getDownloadURL());OmsContainer oldContainer = CARGO.get(containerId);if (oldContainer != null && version.equals(oldContainer.getVersion())) {log.info("[OmsContainer-{}] version={} already deployed, so skip this deploy task.", containerId, version);return;}String filePath = CONTAINER_DIR + containerId + "/" + version + ".jar";// 下载Container到本地File jarFile = new File(filePath);try {if (!jarFile.exists()) {FileUtils.forceMkdirParent(jarFile);FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);log.info("[OmsContainer-{}] download jar successfully, path={}", containerId, jarFile.getPath());}// 创建新容器OmsContainer newContainer = new OmsJarContainer(containerId, containerName, version, jarFile);newContainer.init();// 替换容器CARGO.put(containerId, newContainer);log.info("[OmsContainer-{}] deployed new version:{} successfully!", containerId, version);if (oldContainer != null) {// 销毁旧容器oldContainer.destroy();}} catch (Exception e) {log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);// 如果部署失败,则删除该 jar(本次失败可能是下载jar出错导致,不删除会导致这个版本永久无法重新部署)CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));}}

deployContainer方法先找到旧的OmsContainer,然后判断version是否一样,一样就不用重新部署,否则先从本地查找jar包,找不到则根据downloadURL去下载,然后创建OmsJarContainer,执行其init方法,若存在旧的OmsContainer则执行其destroy方法

OmsContainer

tech/powerjob/worker/container/OmsContainer.java

public interface OmsContainer extends LifeCycle {/*** 获取处理器* @param className 全限定类名* @return 处理器(可以是 MR、BD等处理器)*/BasicProcessor getProcessor(String className);/*** 获取容器的类加载器* @return 类加载器*/OhMyClassLoader getContainerClassLoader();Long getContainerId();Long getDeployedTime();String getName();String getVersion();/*** 尝试释放容器资源*/void tryRelease();
}

OmsContainer接口定义了getProcessor、getContainerClassLoader、getContainerId、getDeployedTime、getName、getVersion、tryRelease方法

OmsJarContainer

tech/powerjob/worker/container/OmsJarContainer.java

@Slf4j
public class OmsJarContainer implements OmsContainer {private final Long containerId;private final String name;private final String version;private final File localJarFile;private final Long deployedTime;// 引用计数器private final AtomicInteger referenceCount = new AtomicInteger(0);private OhMyClassLoader containerClassLoader;private ClassPathXmlApplicationContext container;private final Map<String, BasicProcessor> processorCache = Maps.newConcurrentMap();public OmsJarContainer(Long containerId, String name, String version, File localJarFile) {this.containerId = containerId;this.name = name;this.version = version;this.localJarFile = localJarFile;this.deployedTime = System.currentTimeMillis();}//......
}    

OmsJarContainer实现了OmsContainer接口

getProcessor

    public BasicProcessor getProcessor(String className) {BasicProcessor basicProcessor = processorCache.computeIfAbsent(className, ignore -> {Class<?> targetClass;try {targetClass = containerClassLoader.loadClass(className);} catch (ClassNotFoundException cnf) {log.error("[OmsJarContainer-{}] can't find class: {} in container.", containerId, className);return null;}// 先尝试从 Spring IOC 容器加载try {return (BasicProcessor) container.getBean(targetClass);} catch (BeansException be) {log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", containerId);} catch (ClassCastException cce) {log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", containerId, className);return null;} catch (Exception e) {log.error("[OmsJarContainer-{}] get bean failed for {}.", containerId, className, e);return null;}// 直接实例化try {Object obj = targetClass.getDeclaredConstructor().newInstance();return (BasicProcessor) obj;} catch (Exception e) {log.error("[OmsJarContainer-{}] load {} failed", containerId, className, e);}return null;});if (basicProcessor != null) {// 引用计数 + 1referenceCount.getAndIncrement();}return basicProcessor;}

getProcessor方法会先通过containerClassLoader.loadClass去加载对应的processor类,加载不到则直接返回,之后根据targetClass去spring容器查找,若查找不到则直接通过targetClass.getDeclaredConstructor().newInstance()尝试实例化

init

    public void init() throws Exception {log.info("[OmsJarContainer-{}] start to init container(name={},jarPath={})", containerId, name, localJarFile.getPath());URL jarURL = localJarFile.toURI().toURL();// 创建类加载器(父类加载为 Worker 的类加载)this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader());// 解析 PropertiesProperties properties = new Properties();try (InputStream propertiesURLStream = containerClassLoader.getResourceAsStream(ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME)) {if (propertiesURLStream == null) {log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);}properties.load(propertiesURLStream);log.info("[OmsJarContainer-{}] load container properties successfully: {}", containerId, properties);}String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);if (StringUtils.isEmpty(packageName)) {log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);throw new PowerJobException("invalid jar file");}// 加载用户类containerClassLoader.load(packageName);// 创建 Spring IOC 容器(Spring配置文件需要填相对路径)// 需要切换线程上下文类加载器以加载 JDBC 类驱动(SPI)ClassLoader oldCL = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(containerClassLoader);try {this.container = new ClassPathXmlApplicationContext(new String[]{ContainerConstant.SPRING_CONTEXT_FILE_NAME}, false);this.container.setClassLoader(containerClassLoader);this.container.refresh();}finally {Thread.currentThread().setContextClassLoader(oldCL);}log.info("[OmsJarContainer-{}] init container(name={},jarPath={}) successfully", containerId, name, localJarFile.getPath());}

init方法根据jar包地址创建OhMyClassLoader,然后先解析oms-worker-container.properties,执行properties.load(propertiesURLStream),接着获取配置的packageName,执行containerClassLoader.load(packageName)加载类,然后根据oms-worker-container-spring-context.xml创建spring的ClassPathXmlApplicationContext,设置其classLoader,执行其refresh方法

destroy

    public void destroy() throws Exception {// 没有其余引用时,才允许执行 destroyif (referenceCount.get() <= 0) {try {if (localJarFile.exists()) {FileUtils.forceDelete(localJarFile);}}catch (Exception e) {log.warn("[OmsJarContainer-{}] delete jarFile({}) failed.", containerId, localJarFile.getPath(), e);}try {processorCache.clear();container.close();containerClassLoader.close();log.info("[OmsJarContainer-{}] container destroyed successfully", containerId);}catch (Exception e) {log.error("[OmsJarContainer-{}] container destroyed failed", containerId, e);}return;}log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", containerId, referenceCount.get());}

destroy方法在referenceCount小于等于0时会先删除localJarFile,然后执行processorCache.clear()、ClassPathXmlApplicationContext的close、OhMyClassLoader的close

JarContainerProcessorFactory

tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java

@Slf4j
public class JarContainerProcessorFactory implements ProcessorFactory {private final WorkerRuntime workerRuntime;public JarContainerProcessorFactory(WorkerRuntime workerRuntime) {this.workerRuntime = workerRuntime;}@Overridepublic Set<String> supportTypes() {return Sets.newHashSet(ProcessorType.EXTERNAL.name());}@Overridepublic ProcessorBean build(ProcessorDefinition processorDefinition) {String processorInfo = processorDefinition.getProcessorInfo();String[] split = processorInfo.split("#");String containerName = split[0];String className = split[1];log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName);OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime);if (omsContainer != null) {return new ProcessorBean().setProcessor(omsContainer.getProcessor(className)).setClassLoader(omsContainer.getContainerClassLoader());} else {log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo);}return null;}
}

JarContainerProcessorFactory的build方法它根据#来解析出containerId及className,然后通过OmsContainerFactory.fetchContainer去查找容器,然后通过omsContainer.getProcessor(className)获取对应的processor;JarContainerProcessorFactory的supportTypes为EXTERNAL(外部处理器(动态加载))

小结

WorkerActor的onReceiveServerDeployContainerRequest用于处理ServerDeployContainerRequest,它委托给了OmsContainerFactory.deployContainer;deployContainer方法先找到旧的OmsContainer,然后判断version是否一样,一样就不用重新部署,否则先从本地查找jar包,找不到则根据downloadURL去下载,然后创建OmsJarContainer,执行其init方法,若存在旧的OmsContainer则执行其destroy方法;init方法根据jar包地址创建OhMyClassLoader,创建ClassPathXmlApplicationContext,设置其classLoader,执行其refresh方法;destroy方法在referenceCount小于等于0时会先删除localJarFile,然后执行processorCache.clear()、ClassPathXmlApplicationContext的close、OhMyClassLoader的close。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/661064.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】 归并排序超详解

1.基本思想 归并排序&#xff08;MERGE-SORT&#xff09;是建立在归并操作上的一种有效的排序算法,该算法是采用分治法&#xff08;Divide andConquer&#xff09;的一个非常典型的应用。 将已有序的子序列合并&#xff0c;得到完全有序的序列&#xff0c;即先使每个子序列有序…

Vue3的Props

Vue 3中的props是用于接收父组件传递的数据的属性。在Vue 3中&#xff0c;props的声明发生了一些改变&#xff1a; 使用props选项来声明props。之前的版本中使用props属性来声明&#xff0c;但在Vue 3中改为使用props选项。通过TypeScript或Flow来静态类型检查props。Vue 3允许…

关于智能指针

实现自己的智能指针 //智能指针 保证能做到资源的自动释放 //利用栈上的对象出作用域自动析构的特征&#xff0c;来做到资源的自动释放的 template<typename T> class CSmartPtr { public:CSmartPtr(T *ptr nullptr):mptr(ptr) {}~CSmartPtr() { delete mptr; } privat…

Spring实现事务(一)

Spring事务 .什么是事务事务的操作Spring中事务的实现准备工作创建表创建项目,引入Spring Web, Mybatis, mysql等依赖配置文件实体类 编程式事务(手动写代码操作事务)声明式事务(利用注解自动开启和提交事务) . 什么是事务 事务是⼀组操作的集合, 是⼀个不可分割的操作 在我们…

国产校准件

国产校准件 Ceyear系列校准件是矢量网络分析仪的测试附件&#xff0c;可大幅提高矢量网络分析仪的测试精度。规格品种丰富&#xff0c;涵盖多种同轴、波导校准件&#xff0c;校准精度高&#xff0c;重复性好 功能特点 校准件 校准模块可实现更精准的测量&#xff0c;满足您的测…

RK3588平台开发系列讲解(视频篇)H.264码流结构介绍

文章目录 一、 码流查看工具二、 I帧、 P帧、 B帧三、序列四、GOP, 即关键帧间隔五、片和宏块沉淀、分享、成长,让自己和他人都能有所收获!😄 📢H.264码流结构介绍。 一、 码流查看工具 ① H.264码流查看工具: Elecard_streamEye、 Elecard StreamEye Tools、 Special…

020 switch多选择结构

什么是switch多选择结构 switch语句中的变量类型为char的示例 char grade A; switch (grade){case A:System.out.println("成绩为A");break;case B:System.out.println("成绩为B");break;case C:System.out.println("成绩为C");break;case D:S…

2. HarmonyOS应用开发DevEcoStudio准备-1

2. HarmonyOS应用开发DevEcoStudio准备-1 下载 DevEco Studio 进入HUAWEI DevEco Studio产品页产品页。 单击下载列表右侧的按钮&#xff0c;下载 DevEco Studio。 安装 DevEco Studio 下载完成后&#xff0c;双击下载的 deveco-studio-xxxx.exe&#xff0c;进入 DevEco St…

基于SSM的高校社团管理系统

末尾获取源码作者介绍&#xff1a;大家好&#xff0c;我是墨韵&#xff0c;本人4年开发经验&#xff0c;专注定制项目开发 更多项目&#xff1a;CSDN主页YAML墨韵 我欲乘风归去 又恐琼楼玉宇 高处不胜寒 -苏轼 目录 一、项目简介 二、开发技术与环境配置 2.1 SSM框架 2.2 …

vue中使用stomp.js

简介 STOMP即Simple (or Streaming) Text Orientated Messaging Protocol&#xff0c;简单(流)文本定向消息协议&#xff0c;它提供了一个可互操作的连接格式&#xff0c;允许STOMP客户端与任意STOMP消息代理&#xff08;Broker&#xff09;进行交互。STOMP协议由于设计简单&am…

洞悉智能新纪元:从基础AI到AIGC直至GAI的深度探索

引言 随着科技发展步入快车道&#xff0c;人工智能&#xff08;Artificial Intelligence, AI&#xff09;正以前所未有的速度渗透进我们的日常生活。本篇文章将通过详实的案例分析&#xff0c;帮助读者把握基础AI的核心功能&#xff0c;领略AI生成内容&#xff08;AIGC&#x…

使用STM32的FMC/FSMC接口实现多路数据传输和并发操作的设计与应用

在基于STM32的系统中&#xff0c;FMC&#xff08;Flexible Memory Controller&#xff09;/FSMC&#xff08;Flexible Static Memory Controller&#xff09;接口可以用于实现多路数据传输和并发操作。通过合理的设计和应用&#xff0c;我们可以提高系统的数据处理速度和效率。…

C++(20):通过concept及nlohmann将数据转换为字符串

nlohmann可以自动兼容将C++的很多原生类型转换为json,甚至自定义类型也不需要太复杂的操作就可以转换为json,可以利用这一点将数据转换为string: #include <nlohmann/json.hpp> #include <string> #include <vector> #include <tuple> #include <…

P1228 地毯填补问题

地毯填补问题 题目描述 相传在一个古老的阿拉伯国家里&#xff0c;有一座宫殿。宫殿里有个四四方方的格子迷宫&#xff0c;国王选择驸马的方法非常特殊&#xff0c;也非常简单&#xff1a;公主就站在其中一个方格子上&#xff0c;只要谁能用地毯将除公主站立的地方外的所有地…

关键字:extern ;什么时候类 对象 方法 定义在.h中;

2 关键字:extern 结论就是 严格是extern应该和头文件一起使用, 但是项目简单就可以直接使用? 在简单的项目或者临时的测试代码中&#xff0c;直接使用extern关键字而不通过头文件来声明外部函数或变量是可以的&#xff0c;这种情况下可能会更快捷一些。但是&#xff0c;即使在…

数据可视化 pycharts实现时间数据可视化

自用版 数据格式为&#xff1a; 运行效果为&#xff1a; from pyecharts import options as opts from pyecharts.charts import Polar, Page import csv filename "./hot-dog-places.csv" data_x [] data_y [] with open(filename) as f:reader csv.reade…

前端使用cache storage实现远程图片缓存

Cache Storage 的主要特点和用途 缓存网络资源&#xff1a;可以将经常访问的网络资源缓存到 Cache Storage 中&#xff0c;以提高网页加载速度&#xff0c;减少网络请求。离线访问&#xff1a;当用户处于离线状态时&#xff0c;可以使用 Cache Storage 中的缓存资源来加载网页…

【系统设计】12306架构设计难点(下)

欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术的推送&#xff01; 在我后台回复 「资料」 可领取编程高频电子书&#xff01; 在我后台回复「面试」可领取硬核面试笔记&#xff01; 文章导读地址…

实现两栏布局和三栏布局的多种详细方法

目录 一、背景两栏布局三栏布局 二、两栏布局flex弹性布局 三、三栏布局两边使用 float&#xff0c;中间使用 margin两边使用 absolute&#xff0c;中间使用 margin两边使用 float 和负 margin使用 display: table 实现使用flex实现grid网格布局 参考文献 一、背景 在日常布局…

永磁同步电机速度环闭环控制

文章目录 1、速度环分析2、电机参数3、PI计算4、模型仿真4.1 模型总览4.2 实际转速与参考转速对比4.3 转矩波形4.4 相电流采样波形 模型下载地址&#xff1a; 链接: 速度闭环模型&#xff08;速度电流双闭环&#xff09; 1、速度环分析 2、电机参数 Udc24 V Rs0.6 LdLq1.4e-3…