聊聊PowerJob Server的高可用

本文主要研究一下PowerJob Server的高可用

PowerJobSpringWorker

tech/powerjob/worker/PowerJobSpringWorker.java

public class PowerJobSpringWorker implements ApplicationContextAware, InitializingBean, DisposableBean {/*** 组合优于继承,持有 PowerJobWorker,内部重新设置 ProcessorFactory 更优雅*/private PowerJobWorker powerJobWorker;private final PowerJobWorkerConfig config;public PowerJobSpringWorker(PowerJobWorkerConfig config) {this.config = config;}@Overridepublic void afterPropertiesSet() throws Exception {powerJobWorker = new PowerJobWorker(config);powerJobWorker.init();}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext);BuildInSpringMethodProcessorFactory springMethodProcessorFactory = new BuildInSpringMethodProcessorFactory(applicationContext);// append BuiltInSpringProcessorFactoryList<ProcessorFactory> processorFactories = Lists.newArrayList(Optional.ofNullable(config.getProcessorFactoryList()).orElse(Collections.emptyList()));processorFactories.add(springProcessorFactory);processorFactories.add(springMethodProcessorFactory);config.setProcessorFactoryList(processorFactories);}@Overridepublic void destroy() throws Exception {powerJobWorker.destroy();}
}

PowerJobSpringWorker实现了InitializingBean接口,其afterPropertiesSet会创建powerJobWorker,然后执行其init方法

PowerJobWorker.init

tech/powerjob/worker/PowerJobWorker.java

    public void init() throws Exception {if (!initialized.compareAndSet(false, true)) {log.warn("[PowerJobWorker] please do not repeat the initialization");return;}Stopwatch stopwatch = Stopwatch.createStarted();log.info("[PowerJobWorker] start to initialize PowerJobWorker...");PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first");ServerDiscoveryService serverDiscoveryService = new PowerJobServerDiscoveryService(config);workerRuntime.setServerDiscoveryService(serverDiscoveryService);try {PowerBannerPrinter.print();// 校验 appNameWorkerAppInfo appInfo = serverDiscoveryService.assertApp();workerRuntime.setAppInfo(appInfo);// 初始化网络数据,区别对待上报地址和本机绑定地址(对外统一使用上报地址)String localBindIp = NetUtils.getLocalHost();int localBindPort = config.getPort();String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp);String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort));log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort);workerRuntime.setWorkerAddress(Address.toFullAddress(externalIp, Integer.parseInt(externalPort)));// 初始化 线程池final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());workerRuntime.setExecutorManager(executorManager);// 初始化 ProcessorLoaderProcessorLoader processorLoader = buildProcessorLoader(workerRuntime);workerRuntime.setProcessorLoader(processorLoader);// 初始化 actorTaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime);ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime);WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor);// 初始化通讯引擎EngineConfig engineConfig = new EngineConfig().setType(config.getProtocol().name()).setServerType(ServerType.WORKER).setBindAddress(new Address().setHost(localBindIp).setPort(localBindPort)).setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));EngineOutput engineOutput = remoteEngine.start(engineConfig);workerRuntime.setTransporter(engineOutput.getTransporter());// 连接 serverserverDiscoveryService.timingCheck(workerRuntime.getExecutorManager().getCoreExecutor());log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");// 初始化日志系统OmsLogHandler omsLogHandler = new OmsLogHandler(workerRuntime.getWorkerAddress(), workerRuntime.getTransporter(), serverDiscoveryService);workerRuntime.setOmsLogHandler(omsLogHandler);// 初始化存储TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());taskPersistenceService.init();workerRuntime.setTaskPersistenceService(taskPersistenceService);log.info("[PowerJobWorker] local storage initialized successfully.");// 初始化定时任务workerRuntime.getExecutorManager().getCoreExecutor().scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, config.getHealthReportInterval(), TimeUnit.SECONDS);workerRuntime.getExecutorManager().getCoreExecutor().scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", stopwatch);}catch (Exception e) {log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", stopwatch, e);throw e;}}

PowerJobWorker的init方法会执行serverDiscoveryService.timingCheck(workerRuntime.getExecutorManager().getCoreExecutor())调度timingCheck

timingCheck

tech/powerjob/worker/background/discovery/PowerJobServerDiscoveryService.java

    public void timingCheck(ScheduledExecutorService timingPool) {this.currentServerAddress = discovery();if (StringUtils.isEmpty(this.currentServerAddress) && !config.isAllowLazyConnectServer()) {throw new PowerJobException("can't find any available server, this worker has been quarantined.");}// 这里必须保证成功timingPool.scheduleAtFixedRate(() -> {try {this.currentServerAddress = discovery();} catch (Exception e) {log.error("[PowerDiscovery] fail to discovery server!", e);}}, 10, 10, TimeUnit.SECONDS);}

PowerJobServerDiscoveryService的timingCheck会使用timingPool定时每隔10s调度执行discovery()来更新当前worker的server地址

discovery

    private String discovery() {// 只有允许延迟加载模式下,appId 才可能为空。每次服务发现前,都重新尝试获取 appInfo。由于是懒加载链路,此处完全忽略异常if (appInfo.getAppId() == null || appInfo.getAppId() < 0) {try {assertApp0();} catch (Exception e) {log.warn("[PowerDiscovery] assertAppName in discovery stage failed, msg: {}", e.getMessage());return null;}}if (ip2Address.isEmpty()) {config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));}String result = null;// 先对当前机器发起请求String currentServer = currentServerAddress;if (!StringUtils.isEmpty(currentServer)) {String ip = currentServer.split(":")[0];// 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担String firstServerAddress = ip2Address.get(ip);if (firstServerAddress != null) {result = acquire(firstServerAddress);}}for (String httpServerAddress : config.getServerAddress()) {if (StringUtils.isEmpty(result)) {result = acquire(httpServerAddress);}else {break;}}if (StringUtils.isEmpty(result)) {log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");// 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务if (FAILED_COUNT++ > MAX_FAILED_COUNT) {log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();if (!CollectionUtils.isEmpty(frequentInstanceIds)) {frequentInstanceIds.forEach(instanceId -> {HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);taskTracker.destroy();log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);});}FAILED_COUNT = 0;}return null;} else {// 重置失败次数FAILED_COUNT = 0;log.debug("[PowerDiscovery] current server is {}.", result);return result;}}

discovery方法就是定时遍历配置的serverAddress地址列表,调用server端的acquire方法来获取可用的server

acquireServer

tech/powerjob/server/web/controller/ServerController.java

    @GetMapping("/acquire")public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {return ResultDTO.success(serverElectionService.elect(request));}

ServerController提供了acquire接口,它执行的是serverElectionService.elect(request)

elect

tech/powerjob/server/remote/server/election/ServerElectionService.java

    public String elect(ServerDiscoveryRequest request) {if (!accurate()) {final String currentServer = request.getCurrentServer();// 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));if (localProtocolInfoOpt.isPresent()) {if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) {log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId());return currentServer;}}}return getServer0(request);}

ServerElectionService的elect方法主要是执行getServer0

getServer0

    private String getServer0(ServerDiscoveryRequest discoveryRequest) {final Long appId = discoveryRequest.getAppId();final String protocol = discoveryRequest.getProtocol();Set<String> downServerCache = Sets.newHashSet();for (int i = 0; i < RETRY_TIMES; i++) {// 无锁获取当前数据库中的ServerOptional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);if (!appInfoOpt.isPresent()) {throw new PowerJobException(appId + " is not registered!");}String appName = appInfoOpt.get().getAppName();String originServer = appInfoOpt.get().getCurrentServer();String activeAddress = activeAddress(originServer, downServerCache, protocol);if (StringUtils.isNotEmpty(activeAddress)) {return activeAddress;}// 无可用Server,重新进行Server选举,需要加锁String lockName = String.format(SERVER_ELECT_LOCK, appId);boolean lockStatus = lockService.tryLock(lockName, 30000);if (!lockStatus) {try {Thread.sleep(500);}catch (Exception ignore) {}continue;}try {// 可能上一台机器已经完成了Server选举,需要再次判断AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);if (StringUtils.isNotEmpty(address)) {return address;}// 篡位,如果本机存在协议,则作为Server调度该 workerfinal ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);if (targetProtocolInfo != null) {// 注意,写入 AppInfoDO#currentServer 的永远是 default 的绑定地址,仅在返回的时候特殊处理为协议地址appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());appInfo.setGmtModified(new Date());appInfoRepository.saveAndFlush(appInfo);log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);return targetProtocolInfo.getExternalAddress();}}catch (Exception e) {log.error("[ServerElection] write new server to db failed for app {}.", appName, e);} finally {lockService.unlock(lockName);}}throw new PowerJobException("server elect failed for app " + appId);}

getServer0方法会重试10次,它先针对discoveryRequest指定的currentServer进行activeAddress,成功则返回,没有可用server则加锁进行重新分配,这里优先本机判断

activeAddress

    private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {if (downServerCache.contains(serverAddress)) {return null;}if (StringUtils.isEmpty(serverAddress)) {return null;}Ping ping = new Ping();ping.setCurrentTime(System.currentTimeMillis());URL targetUrl = ServerURLFactory.ping2Friend(serverAddress);try {AskResponse response = transportService.ask(Protocol.HTTP.name(), targetUrl, ping, AskResponse.class).toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);if (response.isSuccess()) {// 检测通过的是远程 server 的暴露地址,需要返回 worker 需要的协议地址final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);if (protocolInfo != null) {downServerCache.remove(serverAddress);ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class);log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol);// 4.3.3 升级 4.3.4 过程中,未升级的 server 还不存在 externalAddress,需要使用 address 兼容return Optional.ofNullable(remoteProtocol.getExternalAddress()).orElse(remoteProtocol.getAddress());} else {log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress);}}} catch (TimeoutException te) {log.warn("[ServerElection] server[{}] was down due to ping timeout!", serverAddress);} catch (Exception e) {log.warn("[ServerElection] server[{}] was down with unknown case!", serverAddress, e);}downServerCache.add(serverAddress);return null;}

activeAddress方法主要是对目标server发起ping请求,超时时间为1s,若目标server挂了,则抛出TimeoutException,将目标server加入到downServerCache中;若目标server响应成功,则从downServerCache中移除

小结

PowerJob的worker在初始化的时候会启动一个定时任务,每隔10s调度执行discovery()来更新当前worker的server地址;discovery方法就是定时遍历配置的serverAddress地址列表,调用server端的acquire方法来获取可用的server;ServerController提供了acquire接口,它执行的是serverElectionService.elect(request),ServerElectionService的elect方法主要是执行getServer0,getServer0方法会重试10次,它先针对discoveryRequest指定的currentServer进行activeAddress,成功则返回,没有可用server则加锁进行重新分配,这里优先本机判断。activeAddress方法主要是对目标server发起ping请求,超时时间为1s,若目标server挂了,则抛出TimeoutException,将目标server加入到downServerCache中;若目标server响应成功,则从downServerCache中移除。

worker定时任务 --> 轮询serverAddress请求acquire --> server端判断目标server的ping是否成功,不成功则加锁优先使用本机作为替代server。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/674404.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

5G NR 信道号计算

一、5G NR的频段 增加带宽是增加容量和传输速率最直接的方法&#xff0c;目前5G最大带宽将会达到400MHz&#xff0c;考虑到目前频率占用情况&#xff0c;5G将不得不使用高频进行通信。 3GPP协议定义了从Sub6G(FR1)到毫米波(FR2)的5G目标频谱。 其中FR1是5G的核心频段&#xff0…

米贸搜|Facebook在购物季使用的Meta广告投放流程

一、账户简化 当广告系列开始投放后&#xff0c;每个广告组都会经历一个初始的“机器学习阶段”。简化账户架构可以帮助AI系统更快获得广告主所需的成效。例如&#xff1a; 每周转化次数超过50次的广告组&#xff0c;其单次购物费用要低28%&#xff1b;成功结束机器学习阶段的…

MySQL索引怎么提高查询的速度?

目录 一、MySQL介绍 二、什么是索引 三、为什么要用索引 四、索引如何提高查询速度 一、MySQL介绍 MySQL是一个开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它是目前最流行和广泛使用的数据库之一。MySQL由瑞典MySQL AB公司开发&#xff0c;并在…

NXP恩智浦电源管理芯片 PIMC VR5510 (配套S32G)芯片手册(I2C通信)-翻译版

文章目录 1. 基本概述2. 简化应用视图3. 特点4. 应用5. 订购信息6. 芯片内部区块视图7. Pin脚信息7.1 Pin 描述7.2 Pinning 8. 产品特性概述8.1 最大额定值8.2 电气特性8.3 操作范围8.4 热力范围8.5 EMC合规性8.6 功能状态图8.7 功能设备操作8.8 主要状态机8.9 深度故障安全状态…

PyTorch中基础模块torch的详细介绍

torch 是 PyTorch 库的核心模块&#xff0c;提供了以下关键功能&#xff1a; 张量&#xff08;Tensor&#xff09;&#xff1a;类似于 NumPy 的 ndarray&#xff0c;但可以无缝地在 CPU 或 GPU 上运行&#xff0c;并且支持自动微分&#xff0c;是深度学习模型中数据的主要表示形…

帮写祝福、年味卡片,属于Mate X5 折叠屏手机的用户过年指南

怎样过一个舒心愉快的新年&#xff1f; 春运往返的漫漫旅途上&#xff0c;手机总是忠实陪伴我们打发那些无聊的时光——用Mate X5的悬停观影模式&#xff0c;相当于自带手机支架&#xff0c;解放你拿着零食的双手&#xff0c;旅途观影更快乐&#xff01; 同时&#xff0c;此模…

Linux(Ubuntu)环境下安装卸载Python3(避免踩坑)

一、安装 第一步&#xff1a; 进入/usr/local/目录&#xff0c;下载Python3&#xff0c;这里我下载的是python 3.8.10&#xff0c;如果要下载其他版本改下链接中的版本号&#xff0c;需与官网版本号对应。 wget https://www.python.org/ftp/python/3.8.10/Python-3.8.10.tgz第…

Ubuntu安装SVN服务并结合内网穿透实现公网访问本地存储文件

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&…

重装系统---首次安装java的JDK

1、去官网或者百度资源选择自己想要下载的jdk版本即可 2、 3、按照步骤安装即可,路径不要更改,默认c盘安装就好,避免后面发生错误。 4、打开电脑的设置,编辑环境变量 这是添加之后的效果 5、再新建一个系统环境变量 6、编辑环境变量Path 添

appium抓包总结

appium抓包总结 背景&#xff1a;有些app通过抓包工具charles等抓不到接口数据&#xff0c;应为这一类抓包工具只能抓到应用层的数据包&#xff0c;而某些app的接口是走的传输层协议&#xff0c;所以此时只能通过AppIUM工具来进行抓包。 1、Appium 是什么&#xff1f; Appium…

【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录 1. 状态初始化总流程梳理2.创建StreamOperatorStateContext3. StateInitializationContext的接口设计。4. 状态初始化举例&#xff1a;UDF状态初始化 在TaskManager中启动Task线程后&#xff0c;会调用StreamTask.invoke()方法触发当前Task中算子的执行&#xff0c;在…

Netty中解决粘包/半包

目录 什么是TCP粘包半包&#xff1f; TCP 粘包/半包发生的原因 解决粘包半包 channelRead和channelReadComplete区别 什么是TCP粘包半包&#xff1f; 假设客户端分别发送了两个数据包 D1 和 D2 给服务端&#xff0c;由于服务端一次读取到的字节数是不确定的&#xff0c;故可…

STM32——FLASH(1)简单介绍、分类、读写流程及注意事项

文章目录 FLASH的特点Nor flash和nand flashflash的读写flash 的存储单位 flash的读写过程 FLASH的特点 可擦写数据可修改可重写访问速度<ROM Nor flash和nand flash Nor flash 1、与SDRAM相似&#xff0c;用户可以直接运行装载到NORFLASH里面的代码&#xff0c;减少SRAM…

Zoho Mail企业邮箱商业扩展第3部分:计算财务状况

在Zoho Mail商业扩展系列的压轴篇章中&#xff0c;王雪琳利用Zoho Mail的集成功能成功地完成了各项工作&#xff0c;并顺利地建立了自己的营销代理机构。让我们快速回顾一下她的成功之路。 一、使用Zoho Mail成功方法概述 首先她通过Zoho Mail为其电子邮件地址设置了自定义域…

js实现LFU算法

LFU LFU算法是最近最少使用次数算法&#xff0c;针对的是使用次数&#xff1b; 补充一点&#xff1a;对于相同使用次数应该需要加上时间戳&#xff0c;看他人实现LFU算法都没有考虑这一点。 本文通过全局nextId来表示第几次使用功能&#xff1b; class LFU {constructor(capa…

spring boot打完jar包后使用命令行启动,提示xxx.jar 中没有主清单属性

在对springBoot接口中间件开发完毕后&#xff0c;本地启动没有任何问题&#xff0c;在使用package命令打包也没异常&#xff0c;打完包后使用命令行&#xff1a;java -jar xxx.jar启动发现报异常&#xff1a;xxx.jar 中没有主清单属性&#xff0c;具体解决方法如下&#xff1a;…

TI毫米波雷达开发——High Accuracy Demo 串口数据接收及TLV协议解析 matlab 源码

TI毫米波雷达开发——串口数据接收及TLV协议解析 matlab 源码 前置基础源代码功能说明功能演示视频文件结构01.bin / 02.binParseData.mread_file_and_plot_object_location.mread_serial_port_and_plot_object_location.m函数解析configureSport(comportSnum)readUartCallback…

day06.C++排序(整理)

一.直接插入排序 void Insertsort(int *a,int n){int i,j;for( i1;i<n;i){if(a[i]<a[i-1]){int tempa[i];//哨兵for( ji-1;temp<a[j];j--){a[j1]a[j];//记录后移}a[j1]temp;//插入到正确位置}} }二.希尔排序 void Shellsort(int *a,int n){for(int dltan/2;dlta>…

Linux增删ip

Linux手动增删IP by: 铁乐猫 日期&#xff1a;2022.03.17 这里主要是记录手动临时添加和删除ip。 ifconfig方式 例&#xff0c;添加&#xff1a; ifconfig eth0:1 192.168.0.101/24移除 ifconfig eth0:1 downip addr方式 添加 ip addr add 192.168.0.102/24 dev eth0 …

(2024 了,这文也太水了)审查 GAN 的 FID 和 SID 指标

Reviewing FID and SID Metrics on Generative Adversarial Networks 公和众和号&#xff1a;EDPJ&#xff08;进 Q 交流群&#xff1a;922230617 或加 VX&#xff1a;CV_EDPJ 进 V 交流群&#xff09; 目录 0. 摘要 2. 相关工作 3. 方法 4. 实验 0. 摘要 生成对抗网络&…