聊聊PowerJob的IdGenerateService

本文主要研究一下PowerJob的IdGenerateService

IdGenerateService

tech/powerjob/server/core/uid/IdGenerateService.java

@Slf4j
@Service
public class IdGenerateService {private final SnowFlakeIdGenerator snowFlakeIdGenerator;private static final int DATA_CENTER_ID = 0;public IdGenerateService(ServerInfoService serverInfoService) {long id = serverInfoService.fetchServiceInfo().getId();snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);}/*** 分配分布式唯一ID* @return 分布式唯一ID*/public long allocate() {return snowFlakeIdGenerator.nextId();}}

IdGenerateService的构造器接收ServerInfoService,然后通过serverInfoService.fetchServiceInfo().getId()获取machineId,最后创建SnowFlakeIdGenerator,其DATA_CENTER_ID为0;其allocate返回的是snowFlakeIdGenerator.nextId()

ServerInfoService

tech/powerjob/server/remote/server/self/ServerInfoService.java

public interface ServerInfoService {/*** fetch current server info* @return ServerInfo*/ServerInfo fetchServiceInfo();}

ServerInfoService定义了fetchServiceInfo方法,返回ServerInfo

ServerInfoServiceImpl

tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java

@Slf4j
@Service
public class ServerInfoServiceImpl implements ServerInfoService {private final ServerInfo serverInfo;private final ServerInfoRepository serverInfoRepository;private static final long MAX_SERVER_CLUSTER_SIZE = 10000;private static final String SERVER_INIT_LOCK = "server_init_lock";private static final int SERVER_INIT_LOCK_MAX_TIME = 15000;@Autowiredpublic ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) {this.serverInfo = new ServerInfo();String ip = NetUtils.getLocalHost();serverInfo.setIp(ip);serverInfo.setBornTime(System.currentTimeMillis());this.serverInfoRepository = serverInfoRepository;Stopwatch sw = Stopwatch.createStarted();while (!lockService.tryLock(SERVER_INIT_LOCK, SERVER_INIT_LOCK_MAX_TIME)) {log.info("[ServerInfoService] waiting for lock: {}", SERVER_INIT_LOCK);CommonUtils.easySleep(100);}try {// register server then get server_idServerInfoDO server = serverInfoRepository.findByIp(ip);if (server == null) {ServerInfoDO newServerInfo = new ServerInfoDO(ip);server = serverInfoRepository.saveAndFlush(newServerInfo);} else {serverInfoRepository.updateGmtModifiedByIp(ip, new Date());}if (server.getId() < MAX_SERVER_CLUSTER_SIZE) {serverInfo.setId(server.getId());} else {long retryServerId = retryServerId();serverInfo.setId(retryServerId);serverInfoRepository.updateIdByIp(retryServerId, ip);}} catch (Exception e) {log.error("[ServerInfoService] init server failed", e);throw e;} finally {lockService.unlock(SERVER_INIT_LOCK);}log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw);}@Scheduled(fixedRate = 15000, initialDelay = 15000)public void heartbeat() {serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date());}private long retryServerId() {List<ServerInfoDO> serverInfoList = serverInfoRepository.findAll();log.info("[ServerInfoService] current server record num in database: {}", serverInfoList.size());// clean inactive server record firstif (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {// use a large time interval to prevent valid records from being deleted when the local time is inaccurateDate oneDayAgo = DateUtils.addDays(new Date(), -1);int delNum =serverInfoRepository.deleteByGmtModifiedBefore(oneDayAgo);log.warn("[ServerInfoService] delete invalid {} server info record before {}", delNum, oneDayAgo);serverInfoList = serverInfoRepository.findAll();}if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {throw new PowerJobException(String.format("The powerjob-server cluster cannot accommodate %d machines, please rebuild another cluster", serverInfoList.size()));}Set<Long> uedServerIds = serverInfoList.stream().map(ServerInfoDO::getId).collect(Collectors.toSet());for (long i = 1; i <= MAX_SERVER_CLUSTER_SIZE; i++) {if (uedServerIds.contains(i)) {continue;}log.info("[ServerInfoService] ID[{}] is not used yet, try as new server id", i);return i;}throw new PowerJobException("impossible");}@Autowired(required = false)public void setBuildProperties(BuildProperties buildProperties) {if (buildProperties == null) {return;}String pomVersion = buildProperties.getVersion();if (StringUtils.isNotBlank(pomVersion)) {serverInfo.setVersion(pomVersion);}}@Overridepublic ServerInfo fetchServiceInfo() {return serverInfo;}
}

ServerInfoServiceImpl实现了ServerInfoService接口,其构造器注入lockService和serverInfoRepository,先通过lockService.tryLock抢到server_init_lock,然后serverInfoRepository.findByIp找到ServerInfoDO执行saveAndFlush或者updateGmtModifiedByIp;其fetchServiceInfo返回的是serverInfo信息;它还以fixedRate为15s调度了heartbeat,主要是更新gmtModifed

SnowFlakeIdGenerator

tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java

public class SnowFlakeIdGenerator {/*** 起始的时间戳(a special day for me)*/private final static long START_STAMP = 1555776000000L;/*** 序列号占用的位数*/private final static long SEQUENCE_BIT = 6;/*** 机器标识占用的位数*/private final static long MACHINE_BIT = 14;/*** 数据中心占用的位数*/private final static long DATA_CENTER_BIT = 2;/*** 每一部分的最大值*/private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);/*** 每一部分向左的位移*/private final static long MACHINE_LEFT = SEQUENCE_BIT;private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;/*** 数据中心*/private final long dataCenterId;/*** 机器标识*/private final long machineId;/*** 序列号*/private long sequence = 0L;/*** 上一次时间戳*/private long lastTimestamp = -1L;public SnowFlakeIdGenerator(long dataCenterId, long machineId) {if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0");}if (machineId > MAX_MACHINE_NUM || machineId < 0) {throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");}this.dataCenterId = dataCenterId;this.machineId = machineId;}/*** 产生下一个ID*/public synchronized long nextId() {long currStamp = getNewStamp();if (currStamp < lastTimestamp) {return futureId();}if (currStamp == lastTimestamp) {//相同毫秒内,序列号自增sequence = (sequence + 1) & MAX_SEQUENCE;//同一毫秒的序列数已经达到最大if (sequence == 0L) {currStamp = getNextMill();}} else {//不同毫秒内,序列号置为0sequence = 0L;}lastTimestamp = currStamp;return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分| dataCenterId << DATA_CENTER_LEFT       //数据中心部分| machineId << MACHINE_LEFT             //机器标识部分| sequence;                             //序列号部分}/*** 发生时钟回拨时借用未来时间生成Id,避免运行过程中任务调度和工作流直接进入不可用状态* 注:该方式不可解决原算法中停服状态下时钟回拨导致的重复id问题*/private long futureId() {sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == 0L) {lastTimestamp = lastTimestamp + 1;}return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分| dataCenterId << DATA_CENTER_LEFT       //数据中心部分| machineId << MACHINE_LEFT             //机器标识部分| sequence;                             //序列号部分}private long getNextMill() {long mill = getNewStamp();while (mill <= lastTimestamp) {mill = getNewStamp();}return mill;}private long getNewStamp() {return System.currentTimeMillis();}
}

SnowFlakeIdGenerator的dataCenterId(最大值为3)和machineId(最大值为16383),sequence最大值为63

小结

PowerJob的IdGenerateService通过serverInfoService.fetchServiceInfo().getId()获取machineId,最后创建SnowFlakeIdGenerator,其DATA_CENTER_ID为0;其allocate返回的是snowFlakeIdGenerator.nextId();其InstanceInfoDO的instanceId就是idGenerateService.allocate()生成的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610177.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue-11、Vue计算属性

Vue计算属性是Vue实例的属性&#xff0c;用来根据已有的数据进行计算得到新的数据。计算属性的值会根据它的依赖缓存起来&#xff0c;在依赖没有发生改变时直接返回缓存的值&#xff0c;提高了性能。 计算属性的定义方式为在Vue实例中使用computed关键字&#xff0c;并将计算属…

Guava:Range 区间范围工具

简介 Range 表示一个间隔或一个序列。它被用于获取一组数字/串在一个特定范围之内。可比较类型的区间API&#xff0c;包括连续和离散类型。 Range 定义了连续跨度的范围边界&#xff0c;这个连续跨度是一个可以比较的类型(Comparable type)。比如1到100之间的整型数据。 在数…

安全防御之备份恢复技术

随着计算机和网络的不断普及&#xff0c;人们更多的通过网络来传递大量信息。在网络环境下&#xff0c;还有各种各样的病毒感染、系统故障、线路故障等&#xff0c;使得数据信息的安全无法得到保障。由于安全风险的动态性&#xff0c;安全不是绝对的&#xff0c;信息系统不可能…

【Unity】UniTask(异步工具)快速上手

UniTask(异步工具) 官方文档&#xff1a;https://github.com/Cysharp/UniTask/blob/master/README_CN.md URL:https://github.com/Cysharp/UniTask.git?pathsrc/UniTask/Assets/Plugins/UniTask 优点&#xff1a;0GC&#xff0c;可以在任何地方使用 为Unity提供一个高性能&…

(Matlab)基于CNN-GRU的多输入分类(卷积神经网络-门控循环单元网络)

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、部分程序&#xff1a; 四、完整代码数据分享&#xff1a; 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 本代码基于Matlab平台编译…

雍禾医疗启动“毛发森林”公益计划 为地球种植“发际线”

不久前&#xff0c;由雍禾植发、新华网及中国绿化基金会三方共同发起的 “毛发森林地球生发计划”在内蒙古自治区阿拉善盟额济纳旗揭牌启动,此计划将陆续在西部严重沙化地区植下十万棵梭梭树改善荒漠化地区环境。 据悉,早在2023年3月,雍禾植发就主动与新华网一起启动“让美好生…

Rabbitmq 消息可靠性保证

1、简介 消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功&#xff0c;本文详细介绍两个环节的消息可靠性传递方式&#xff1a;1&#xff09;、消息传递到交换机的 confirm 模式&#xff1b;2&#xff09;、消息传递到队列的 Return 模式。 消息从 producer 到 ex…

Msa全球最新研究:多系统萎缩特效药全球最新进展?

多系统萎缩&#xff0c;是一种以神经系统为主的遗传性疾病&#xff0c;典型症状表现为运动障碍、自主神经功能障碍和认知障碍等。对于这种疾病&#xff0c;西医目前尚未有明确的根治办法&#xff0c;大多数医生只能通过药物缓解患者的症状&#xff0c;但无法彻底治愈。 然而&a…

轮排索引相关知识

轮排索引&#xff08;Round Robin Index&#xff09;是一种数据结构&#xff0c;通常用于分布式系统中的索引管理。 基本思想是将索引分为多个块&#xff0c;并循环地轮流在这些块中存储键值对&#xff0c;以实现负载均衡。这样可以防止某个特定块的负载过重&#xff0c;使得系…

C#的list进行升序或者降序的排列方式(亲测有效)

C#的list进行升序或者降序的排列方式&#xff08;亲测有效&#xff09; 1.list升序排序&#xff1a;时间从小到大排列 s在前就是升序 listNew.Sort((s, x) > s.CreateTime.CompareTo(x.CreateTime));2.list降序排序&#xff1a;时间从大到小排列 x在前就是降序 listNew.So…

第9章 正则表达式

学习目标 熟悉正则表达式,能够说出正则表达式的概念和作用 掌握正则表达式的创建,能够使用两种方式创建正则表达式 掌握正则表达式的使用,能够使用正则表达式进行字符串匹配 掌握正则表达式中元字符的使用,能够根据需求选择合适的元字符 掌握正则表达式中模式修饰符的使用,…

CDN加速之HTTPS配置

记录一下HTTPS配置的免费证书配置 2张图搞定 最后补充说明&#xff1a; 由于CDN采用的Tengine服务基于Nginx&#xff0c;因此开启HTTPS安全加速功能的加速域名&#xff0c;只支持上传Nginx能读取的PEM格式的证书。如果证书不是PEM格式&#xff0c;需转换成PEM格式。转换方法&a…

C#,字符串匹配(模式搜索)KMP算法的源代码与数据可视化

D.E.Knuth J.H.Morris 一、KMP算法 KMP 算法&#xff08;Knuth-Morris-Pratt 算法&#xff09;是其中一个著名的、传统的字符串匹配算法&#xff0c;效率比较高。 KMP算法由D.E.Knuth&#xff0c;J.H.Morris和V.R.Pratt在 Brute-Force算法的基础上提出的模式匹配的改进算法。…

江山易改本性难移之ZYNQ SDK QSPI固化bug及其解决方法

之前在Vivado2018.3通过QSPI方式固化程序时出现问题&#xff0c;显示flash擦除成功&#xff0c;但最后总是不能写入到flash中。 查资料发现从VIVADO 2017.3版本开始&#xff0c;Xilinx官方为了使Zynq-7000和Zynq UltraScale 实现流程相同&#xff0c;在QSPI FLASH使用上做了变化…

基于JAVA+SpringBoot的高校学术报告系统

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; 智慧高校学术报告系统…

【SpringCloud】之入门级及nacos的集成使用

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是君易--鑨&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的博客专栏《SpringCloud开发之入门级及nacos》。&#x1f3…

【Github3k+⭐️】《CogAgent: A Visual Language Model for GUI Agents》译读笔记

CogAgent: A Visual Language Model for GUI Agents 摘要 人们通过图形用户界面&#xff08;Graphical User Interfaces, GUIs&#xff09;在数字设备上花费大量时间&#xff0c;例如&#xff0c;计算机或智能手机屏幕。ChatGPT 等大型语言模型&#xff08;Large Language Mo…

免费用chatGPT

免费用chatGPT&#xff0c;地址&#xff1a; DocGPT - 第二大脑

设计模式面试

C 面向对象设计 封装&#xff1a;隐藏内部实现继承&#xff1a;复用现有代码多态&#xff1a;改写对象行为 设计模式关键在于分解和抽象; 设计模式的主要目的是易于变化 面向对象设计原则–比设计模式更加重要 违背了设计原则&#xff0c;设计模式是错误的。 依赖倒置原则…

你们做外贸主要的获客渠道有哪些?

昨天跟一个同行朋友聊天&#xff0c;他原本主打产品是做动力类的&#xff0c;这两年竞争太大&#xff0c;订单也减少了很多。为了求发展&#xff0c;就拓品了&#xff0c;而拓展的新品刚好是我们这一块&#xff0c;而且非常迅速地找到场地把生产线弄了起来&#xff0c;还不断扩…