Moka HR SaaS 招聘平台
主要负责 Moka HR SaaS 招聘平台的模块开发,参与功能设计与实现,提升系统的稳定性和用户体验。
**技术栈:**SpringCloud、Mybatis-Plus、MySQL、Redis、OAuth 2.0、CAS(SSO)、MokaBatch、Apollo等。
主要职责:
1.SSO(CAS) + OAuth 2.0
根据客户需求,定制化实现单点登录和用户授权,采用 CAS 集成 OAuth 2.0 授权码模式,提升系统安全性和用户体验。
需求:需求是这样的:当时对接的客户是携程,携程公司内部的多个子系统之间采用的是CAS方式实现单点登录的,单个子系统在CAS认证中心登陆后,其他子系统就不用重复登录了。他们使用我们的Moka招聘系统进行招聘,需求就是要打通内部系统和moka的招聘系统,这样只要携程的内部员工登录了他们的内部系统就可以直接点击moka招聘系统后跳转到moka招聘系统里面,不需要再次登录。其次对于外部的合作伙伴(第三方猎头公司的人),也需要登录moka招聘系统,帮助携程去招聘,把一些通过猎聘招的候选人在moka招聘系统导入,并同步到携程的系统。这些外部合作伙伴就不走CAS单点登录的方式,而是走OAuth2微信的授权方式进入moka招聘系统,因为猎头公司只能访问moka招聘系统而不允许访问携程内部的系统。(重要!!!!)
所以,我们moka这边的开发工作就是需要做的就是:
-
依据用户的身份来决定认证方式。
-
当用户是携程的内部员工时,Moka系统会自动通过CAS进行认证。(认证服务器) orgId:携程 方式:CAS
-
当用户是外部合作伙伴时,系统会通过OAuth 2.0进行授权和认证。(授权服务器)orgId:携程 方式:OAuth 2.0
前提:合作伙伴需要提前在携程绑定微信,表示是合作伙伴,用于授权的验证。
-
-
身份和权限管理
:你可以开发一个统一的用户管理模块,能够识别用户的来源(内部员工或外部合作伙伴),并根据他们的身份授予不同的权限。
- 内部员工可能拥有更高的权限,可以访问招聘的更多功能和数据。 合作伙伴则只允许访问他们的特定任务和功能,比如提交简历或查看候选人的状态。
具体步骤:
前置工作:需要将 认证服务器地址 和 授权服务器地址 在Moka SSO服务进行注册,用后面重定向进行CAS认证和OAuth2授权。
内部员工(CAS):
1. 用户首次访问 Moka 系统
- 用户是 携程内部员工,访问 Moka 系统时,Moka 的 SSO 服务根据
orgId
识别出该用户来自携程,并发现该用户需要通过 CAS 进行身份认证。 - SSO 服务将用户的请求重定向到 携程的 CAS 认证中心,并附上 Moka SSO服务地址(service URL),用作认证后的回调地址。
2. CAS 认证中心验证用户身份
-
用户被重定向到携程 CAS 认证中心后,CAS 系统检查用户是否已经有有效的 TGT(Ticket Granting Ticket)。
TGT 的作用:
- 长期认证凭证:当用户首次登录 CAS 时,CAS 认证中心会生成一个 TGT,并将其保存在 CAS 的服务器端。用户浏览器中会存储与 TGT 相关联的 cookie(一般称为
CASTGC
)。这个 TGT 用来表示用户已经通过了身份验证,且在 CAS 系统中是“已登录”的状态。
- 长期认证凭证:当用户首次登录 CAS 时,CAS 认证中心会生成一个 TGT,并将其保存在 CAS 的服务器端。用户浏览器中会存储与 TGT 相关联的 cookie(一般称为
-
两种情况:
- 如果用户已经登录过,且 TGT 仍然有效:CAS 会根据用户的 TGT 自动生成一个 ST(Service Ticket),并跳转回 Moka 系统(service URL),带上这个 ST。
- 如果用户没有有效的 TGT(例如首次登录或 TGT 过期):CAS 系统会要求用户输入登录凭证(如用户名和密码),通过验证后,生成一个 TGT 并存储于用户的浏览器,同时生成一个 ST,并跳转回 Moka 系统。
3. Moka 系统收到 ST(Service Ticket)
-
当用户通过 CAS 认证后,CAS 系统会生成一个 ST(Service Ticket),并将用户重定向回 Moka 系统,同时在 URL 中附上 ST。
ST 的作用:
- 短期服务票据:ST 是针对特定服务(如 Moka 系统)生成的短期票据,用于证明用户已经在 CAS 系统中通过了身份验证。ST 只能使用一次。
-
Moka 系统收到 ST 后,会向携程的 CAS 认证中心发送请求,验证这个 ST 的有效性。
4. CAS 系统验证 ST 并返回用户信息
- Moka 系统将 ST 发回给 CAS 认证中心请求验证。CAS 系统验证 ST 是否有效(确认这是 CAS 刚刚发放的有效票据,且未被使用过)。
- 如果 ST 有效:CAS 系统返回用户的身份信息(如用户名、用户ID等)给 Moka 系统。
- 如果 ST 无效:CAS 系统会返回错误,Moka 系统会要求用户重新认证。
5. 避免多次在CAS校验ST
-
为了避免每次请求都需要重新去 CAS 认证中心 验证 ST 的合法性,Moka 系统在 ST 验证成功后,会根据用户信息生成一个本地会话 token(如 JWT)。这个 token 带有一定的有效期(例如 30 分钟或 1 小时),用于在这段时间内管理用户的登录状态。用户信息会存储到 Redis,并设置过期时间。
之后,用户的每次请求都会携带这个 token,Moka 系统通过 Redis 校验 token 的有效性,避免了频繁访问 CAS 认证中心,提升了系统性能和用户体验,同时确保了用户会话的安全性。
TGT 和 ST 在这个流程中的作用总结:
- TGT(Ticket Granting Ticket):长时间存储的认证凭证,用于用户在多次请求时无需重新输入凭证。在 CAS 系统中,它在服务器端保持用户的登录状态。
- ST(Service Ticket):短期凭证,针对特定服务(如 Moka 系统)生成,用于一次性证明用户已经通过 CAS 认证。ST 只能使用一次,用完即失效。
流程图总结:
- 用户访问 Moka 系统 → Moka SSO 识别用户来源并重定向到 CAS。
- CAS 检查用户 TGT:
- 有 TGT:跳到步骤 4。
- 无 TGT:用户输入凭证并生成 TGT 和 ST。
- CAS 重定向用户回 Moka 系统,带上 ST。
- Moka 系统验证 ST,从 CAS 服务器获取用户信息。
- 用户登录成功,Moka 系统授予访问权限。
这个流程确保了携程内部员工只需在 CAS 系统中登录一次,就可以访问 Moka 系统,并且可以无缝地在携程内部的其他系统之间切换。
外部合作伙伴(OAuth 2.0):
- 外部合作伙伴(比如第三方猎头公司)通过钉钉或企业微信等外部应用访问Moka招聘系统。
- 这些合作伙伴已经在钉钉或企业微信中登录,通过OAuth 2.0授权流程,Moka系统可以获取这些用户的访问令牌(access token)。
- Moka系统通过令牌验证合作伙伴的身份,并获取他们的授权信息。
- 合作伙伴可以根据其权限在Moka系统中执行任务,如查看候选人、提交简历等,但他们不能访问携程的其他内部系统。
1.外部合作伙伴访问 Moka 系统
- 当外部合作伙伴(猎头公司员工)首次访问 Moka 招聘系统时,Moka 的 SSO 服务根据
orgId
识别出该用户是外部合作伙伴,而不是携程的内部员工。 - 由于这些用户不走携程的 CAS 单点登录系统,Moka 系统会将他们的请求重定向到 微信 OAuth 2.0 授权页面,要求用户通过微信进行登录。
2. 重定向到微信授权页面
- 用户会被重定向到微信的授权页面,微信会请求用户确认授权,允许 Moka 系统访问他们的基本信息。
- 用户登录或确认后,微信授权服务器会生成一个 授权码(Authorization Code) 并将其返回到 Moka 系统的回调 URL。
3. Moka 系统接收授权码
- Moka 系统在接收到微信返回的 授权码 后,使用这个授权码向微信的授权服务器发送请求,交换获取 访问令牌(Access Token)。
- 这个访问令牌将用于获取用户的微信账户信息,比如 微信 OpenID、昵称、头像等。
4. 获取微信用户信息
- 微信授权服务器返回访问令牌和用户的基本信息(如 OpenID)给 Moka 系统。
- Moka 系统接收这些信息并在自己的数据库中查找或创建该外部合作伙伴的账户。如果是首次登录,系统会根据微信信息创建一个新的用户。
5. 验证是否为合作伙伴及权限分配
-
Moka 系统需要确认该用户是否为携程的
合法合作伙伴
,这一验证通常通过与
携程的合作伙伴管理系统
通信实现。
- Moka 系统会向携程的合作伙伴管理系统发送请求,携带微信 OpenID 进行验证。
- 携程系统会返回用户是否是合法合作伙伴的结果,并同时返回该用户在 Moka 招聘系统中的权限。
6. 授予访问权限
-
如果携程系统确认该用户为合法合作伙伴,并返回其拥有的权限信息,Moka 系统会根据这些权限来
授予用户相应的功能访问权限
,如:
- 查看候选人状态
- 提交简历
- 编辑或导入候选人信息
-
Moka 系统生成一个本地的 token(例如 JWT),用于维护用户会话,避免频繁地请求微信授权服务器。用户信息会存储在 Redis 中,并设置过期时间。
7. 用户登录成功
- 用户登录成功后,每次请求都会带上 Moka 系统生成的 token,Moka 系统根据此 token 验证用户的登录状态及权限。
- 用户可以根据其权限访问 Moka 招聘系统的相应功能模块,执行其职责。
流程图总结:
- 用户访问 Moka 系统 → Moka SSO 服务识别用户来源并重定向到微信授权页面。
- 用户通过微信授权 → 微信服务器返回授权码。
- Moka 系统使用授权码获取访问令牌 → 微信服务器返回访问令牌和用户信息。
- Moka 系统验证用户是否为携程合作伙伴 → 携程系统返回验证结果和权限信息。
- Moka 系统生成本地 token 并授予权限 → 用户可以访问特定功能模块。
2.生态接口
开发数据同步模块的部分生态接口,支持组织人员、招聘需求、职位、候选人、会议室等数据的同步与对接。
3.慢SQL优化
由于任务日志表中 content 字段存储大体积 JSON 数据,频繁解析导致查询效率低下。我通过提取常用的 JSON 属性至新列,并逐条更新,避免批量处理对服务器的压力。起初耗时4分26秒,排查索引失效后,通过创建联合索引将查询时间缩短至9秒,结合多线程分片处理最终优化至5秒,显著提升了查询和数据处理效率。
需求背景:
我们有一个任务日志表 task,表结构中存储了大量任务的日志信息。原本,所有开发人员都能查看到所有任务的日志记录。现在有一个新的需求,需要增加权限控制,让用户只能看到自己创建的任务日志。为此,查询任务列表时必须通过 createby
字段来过滤,确保只显示当前用户的任务日志。
问题:
createby
字段目前是嵌套在content
字段中的,这是一个大 JSON 字符串。在查询时频繁解析这个 JSON,带来了明显的性能问题,查询效率非常低。
解决方案的设计:
1. 新增 createby
字段:
为了提升查询效率,我决定将 createby
字段从 content
JSON 中解耦,新增一个独立的 createby
列。在新的数据插入时,重写日志实体类的构造方法,使得每次插入时 createby
字段会直接写入到新列,而不再嵌入到 content
中。
2. 更新已有数据:
对于之前已经存在的记录,仍然需要将 content
中的 createby
提取出来,更新到新列中。由于 content
中的数据量较大,直接批量解析和更新会造成很大的性能瓶颈,因此我选择逐条解析更新。
过程中的优化步骤:
初始方案(索引问题):
一开始,我采用了一个范围查询(id)的方式来逐步更新 createby
列:
SELECT id, content FROM task WHERE createby = -1 AND id > lastId AND id < maxId;
我在测试环境下运行了这个查询,结果 10,000 条数据耗时 4 分 26 秒。于是,我通过 EXPLAIN
发现type为All,possible_keys为NULL,虽然 createBy
列有索引,但由于其值全为 -1
,选择性太低,MySQL 优化器认为使用索引无效,最终选择了全表扫描。
type: ALL
:表明 MySQL 执行了全表扫描,没有使用任何索引。
possible_keys
和 key
为 NULL,说明没有有效索引被使用。
索引优化(联合索引):
发现索引失效后,我意识到没有充分利用唯一的 id
作为索引。于是,我创建了 id
和 createBy
的联合索引,确保查询时索引不再失效。这样 MySQL 可以同时利用 id
的范围查询和 createBy
的过滤条件,进一步提升查询性能,查询时间从 4 分 26 秒缩短到了 9 秒。
CREATE INDEX idx_id_createBy ON task(id, createBy);
多线程优化(并行处理):
在时间充裕的情况下,我想进一步优化这个过程。由于任务表的数据是自增的且没有大的断层,我想到可以通过 多线程分片处理 来提升性能。
- 我先查询表中最后一行的
id
,获取最大id
值,然后将数据动态划分为多个区间。 - 每个线程负责处理一个区间内的数据,独立更新对应区域的
createby
字段。 - 这样,多个线程可以并行处理不同的数据分片,大大减少了处理时间。
通过这个多线程分片处理的方式,最终将查询和更新的时间优化到了 5 秒。
线程数的确定:
在多线程处理时,我选择的线程数主要依据服务器的 CPU 核心数,通常设定为 CPU 核心数的 1 到 1.5 倍,以充分利用并行处理能力。此外,还要考虑任务的 I/O 密集型特性,适当增加线程数以便优化 I/O 处理。
总结:
整个过程中,最开始的需求是权限控制,需要对任务日志表增加 createby
字段来提高查询效率。为此,我逐步优化了方案:
- 首先,通过将
createby
字段从 JSON 中解耦,避免每次查询时的 JSON 解析。 - 其次,通过 排查索引失效,结合
id
列创建联合索引,显著提升了查询性能。 - 最后,通过 多线程分片处理,进一步加速了整个更新过程,将耗时从几分钟优化到几秒。
整个优化过程体现了对 SQL 查询性能的深入理解以及对多线程并行处理的灵活应用,使得系统的查询和更新效率得到了大幅提升。
4.MokaBatch
使用公司自研的异步任务处理框架 MokaBatch,优化批量简历处理流程,解决大批量操作导致的超时和系统性能问题,提升系统稳定性与用户体验。
1.定位(背景):
- 以独立服务存在,解决大批量业务操作超时体验差的问题(比如:在发布热门职位时,我们会收到大量的简历,需要进行批量上传、解析、分类和筛选简历,如果使用单个节点来处理批量简历,就可能会导致前端一直在等待,甚至有些就直接超时,用户体验差。)
- 降低大批量操作对单节点造成的压力,提高系统稳定性。
- 任务“生产方”与“消费方”可以是不同业务集群(相当于把一个大的请求分成多个小请求去执行)。
生产者:接收批量请求这一方。
消费者:接收到批量请求后去调哪个业务集群去执行。
2.组成部分:
- 跑批核心服务 接收批量请求并异步执行任务跑批
- 业务互斥锁(可选) 解决各种业务之间互相排斥的问题
- 任务中心(可选) 展示批量操作的任务明细,失败任务的重试
对接跑批服务后整体的交互方式:
比如:批量创建offer,首先给该业务加互斥锁。然后提交到跑批服务,然后跑批服务就把这一批业务进行分解,分解成n个子任务去调用业务集群去执行,执行完之后释放互斥锁。执行完之后的结果需要同步到任务中心,可以实时的看到任务执行的结果和进度。
3.业务调度
跑批服务调度业务执行时,分为“同步调度”和“异步调度”,具体如下:
同步调度
优势:可以精确控制调度的TPS
不足:由于同步等待业务执行结果,占用系统资源;接口read timeout时,结果未知
场景:逻辑执行耗时较短的业务,例如:阶段移动、通知候选人
异步调度
优势:调度请求发送后立即返回,节省系统资源占用;不会出现业务处理超时的问题
不足:业务执行结果未回调同步时,结果未知
场景:逻辑执行耗时较长,需要长时间阻塞等待的业务,例如:数据导出、简历导出
比如导入1个简历还是上万个简历,他都是一个请求,等待的时间是不可估计的。导入简历少的话使用同步就不会有什么延迟,但是如果简历特别多,应该使用异步调度的方式通知用户。
与“同步调度”不同的是,业务逻辑需要放在线程池中异步执行,且最后需要回调跑批服务,上报执行结果(重要!!!)
在实现被异步调度的业务逻辑实现时,一定要考虑线程池的隔离,避免影响正常业务(???)
4.业务互斥锁
背景:跑批任务本身是需要接受延迟处理,但是跑批操作可能会与其他操作冲突。
例如:对于某个申请来说,创建面试操作必须在面试阶段才能处理,那么在阶段移动操作跑批未完成之前,创建面试操作不能进行的。
目标:根据操作之间的排斥规则,自动进行过滤以及加锁支持批量加解锁。
**互斥是怎么判断的?**需要在Apollo里面配置,有一个配置项batch_operation_exclusion_rule,进行业务互斥规则的配置。
数字为OperationTypeEnum的操作code码:{0:[0,1,2], 6:[0,1]}
0: [0,1,2]的含义:在code为0的操作处理过程中,尚未处理的数据不允许进行0、1、2的操作。
以下配置都可以在Apollo上面进行配置:
当前业务走跑批服务的最低数量 作业优先级 org优先级 单位时间最大的请求数量限制 通知同步还是异步方式
锁过期时间: 非跑批:5min 跑批:1天
5.任务中心
目标:实现跑批任务的可视化界面,HR 可以清晰了解每个任务的执行情况,并可以对失败任务进行重试操作。。
支持:1.任务明细查看 2.批量取消操作 3.失败重试
6.案例-现在方式:
1.系统资源动态调整:通过设定操作的优先级,按照比例动态为不同操作分配可执行的资源数量。(比如:阶段移动和创建面试之间是有一些资源占用的,阶段移动之后需要发通知给候选人,阶段一点延迟小,发通知延迟高,我们可以把资源有限调给更重要更及时的这种阶段移动,进行合理的优先级配置)
2.提高单节点稳定性:将大批量请求借助整个业务集群进行处理,降低单节点风险;控制跑批任务的TPS,避免出现高压力运行,进而保护系统。(通过发出和进行调度的TPS来控制单个节点的稳定性)
3**.削峰填谷**:当瞬间出现大量异步任务时,可以通过DB缓冲,将对系统的影响降到最小。
7.辅助功能:
1.支持多环境隔离执行(例如:灰度环境切分流量)
2.支持org级别黑名单(可用于场景:org迁移、异常org拦截)
3.过期任务定时清理,默认有效期30天
4.“业务互斥锁”超时自动解锁(跑批时使用的超时时间默认1天,非跑批使用超时时间为5分钟)
5.任务积压监控,在服务监控大盘面板中可看,监控指标如下:
- org维度积压任务量
- job操作维度积压任务量
- 超过30分钟待执行任务量
统计:
ATS侧单批次操作数量最高值(单个请求的批量的最大值):
创建offer:77
阶段移动:7278
推荐到其他职位:716
通知候选人:7020
导出简历:9701
导出数据:7654
8.原理实现-跑批服务(重点)
1.跑批服务核心:
- 队列:借助redis的zset实现MokaBatch服务集群共享,并通过score控制任务顺序(每一个任务是一个queue,多个任务是多个不同的queue,通过score来控制不同任务的执行顺序)。
- 发布待处理任务进队列。
- 单个队列最大放入1000条数据。
2.从队列拉任务并执行:
从待处理任务队列取任务并放入处理中的队列中,然后取出处理中队列的任务进行任务的发送。
拉取的规则:1、org优先级优先 2、其次,时间优先
score的值:优先级+时间戳 组成。
拉取任务代码是通过lua脚本实现:(使用lua脚本保证原子性操作!!! 任务都是批量操作)
3.任务发送时的步骤:
首先,进行限流验证,不通过需要将任务归还给队列。(限流去Apollo配置查看最大限流任务数然后进行对比)
其次,发起任务并记录执行结果。分情况:判断如果是同步调度就直就借助线程池发起任务;如果是异步调度就发布任务到MokaBatch服务并记录状态。(同样是Apollo配置判断是否达到异步处理的最低任务数量)
4.限流实现(使用hash方式限流) 为什么使用hash 而不是 keyvalue方式?
场景:我们需要一次性获取所有正在执行的操作及其对应的操作类型和占用资源数,而Key-Value方式可能需要遍历整个数据集来获取这些信息,这样做性能较低,尤其在高并发或大规模的情况下。Hash 方式则可以避免这个问题。
避免遍历:Key-Value方式需要遍历所有键来获取操作信息,效率低。而Hash通过哈希函数直接定位操作,能够一次性获取所有相关数据,查询效率更高。
多维度存储:Key-Value只能存储单一的值,难以管理操作的多维信息(如操作类型、资源占用等)。Hash方式允许存储多个属性,方便快速查询和更新操作状态。
高并发场景:在高并发下,Key-Value的遍历开销大,而Hash的分桶机制可以更好地分配请求,提升系统性能和扩展性。
批量操作效率:Hash结构支持快速的批量查询,能够一次性获取所有操作和资源占用情况,而Key-Value需要逐个操作,效率较低。
批量简历处理 批量职位发布和更新 批量候选人状态更新 批量通知和沟通 批量导出数据 批量数据清理和归档 批量生成报告
5.Elastic-Job
利用 Elastic-Job 和 ZooKeeper 实现分布式任务调度,涵盖任务分片、分布式锁和节点选主,确保系统高效稳定运行。
区别?
ElasticJob:采用分布式数据库存储任务元数据,支持分片广播、分片失效转移等高可用特性,适合于大规模任务调度和处理。
XXL-Job:通过调度中心实现任务的管理和调度,支持任务执行日志管理、报警等功能,适用于任务的集中调度和监控管理。
场景描述:
Moka作为多租户的HR SaaS系统,需要同时处理多个公司的简历筛选任务,在招聘时期,筛选简历的份数可达10万以上的。使用ElasticJob可以将这些任务分片,分配到不同节点上执行,确保系统在高并发下仍能高效、稳定地运行。
为什么适合用ElasticJob:
- 高并发简历筛选:简历筛选任务可能会由多个HR同时发起,系统需要能够处理不同HR的并发需求。通过ElasticJob,系统可以将这些任务分片,分布到不同的节点上执行,避免单一节点的过载问题。(分片!!!)
- 任务拆分与分片:ElasticJob支持任务分片处理,可以将大量简历按用户ID范围、简历创建时间进行分片,并将每个分片分配给不同的节点或线程去处理。每个节点处理自己的分片,极大提高了处理速度和系统的负载能力。(分片!!)
- 任务容错与重试:在分布式环境下,任务执行过程中可能会有失败情况。ElasticJob支持任务的重试和失败容错机制,保证筛选任务的成功率。即便某个节点出现问题,任务也会自动转移到其他节点继续执行,保证任务的可靠完成。(任务容错与重试!!)
实现方式:
假设一个HR发起简历筛选任务,该任务包含100,000份简历。系统通过ElasticJob进行分布式调度,将这10,000份简历按用户ID分成10个分片,每个分片包含1,0000份简历:
- ElasticJob根据配置的分片规则,将10个分片分配给不同的节点。
- 每个节点分别处理自己负责的10,000份简历,执行筛选操作(如匹配关键字、评分等)。
- ElasticJob负责协调各个节点的任务执行情况。如果某个节点出现故障,ElasticJob会自动将该分片任务重新分配给其他正常的节点。
- 当所有分片任务完成后,ElasticJob会通知HR简历筛选任务已完成。
总结:
这个场景充分体现了ElasticJob的分片、容错、分布式调度等优势,尤其适用于需要处理大批量数据且需要在多节点间分布执行的任务。在高并发简历筛选中,ElasticJob能够确保任务的高效处理,并提供可靠的故障恢复机制。
ElasticJob使用ZooKeeper来确保任务分片的唯一性和不重复执行,其核心原理是通过ZooKeeper的分布式锁机制和节点协调机制,来管理和分配任务分片。下面是实现原理的详细解释:
1. 任务分片的概念:
- 在ElasticJob中,一个大任务可以被分成多个分片(Shard),每个分片代表任务的一部分,多个节点可以并行处理这些分片,提升任务执行效率。
- 分片示例:假设有10000条简历数据需要筛选,可以分成10个分片,每个分片包含1000条数据,分发给不同的节点去执行。
2. ZooKeeper的分布式锁:
- ZooKeeper通过分布式锁的机制来确保同一时间一个分片只能被一个节点处理,防止重复执行。
- 实现过程
- 当一个任务被触发时,ElasticJob通过ZooKeeper将任务分片信息保存在ZooKeeper的ZNode(类似目录节点)中,每个分片会在ZooKeeper中注册为一个子节点。
- 每个节点在执行任务时,会通过ZooKeeper的分布式锁机制对这些分片进行竞争,获取执行权限。
3. 锁的获取与分片分配:
- 当多个节点启动时,它们会向ZooKeeper请求锁,以获取某个分片的执行权限:
- 节点启动:ElasticJob中的节点启动时,会向ZooKeeper注册自己,并尝试获取任务分片的锁。(多实例需先获取锁!!!)
- 竞争锁:每个节点都会尝试对一个或多个分片加锁。如果某个节点成功获取某个分片的锁,它就可以执行该分片的任务,同时其他节点则无法获取该分片的锁,避免了同一个分片的重复执行。
- 锁的释放:当分片任务执行完毕后,节点会释放锁,ZooKeeper会更新节点状态,标记该分片任务已完成。
4. 任务分片唯一性保证:
- ZooKeeper通过监控机制来确保任务分片的唯一性:
- 每个分片在ZooKeeper中只有一个唯一的ZNode,所有节点在执行任务时,都会向该ZNode发起锁请求。
- 如果某个节点成功加锁,则其他节点只能等待该锁释放,保证了分片的唯一性执行。
- 这确保了一个分片在同一时刻只能由一个节点来执行。
5. 故障处理与任务转移:
- 如果某个节点在执行任务时发生故障(如宕机或失联),ZooKeeper会通过其会话过期机制来检测节点的失效:
- 节点失效检测:ZooKeeper持续监控各个节点的心跳,如果某个节点长时间没有响应,ZooKeeper会将其视为失效,自动释放该节点持有的锁。
- 任务重分配:当锁被释放后,其他节点可以再次竞争锁并获取该分片的执行权限,确保任务能够被转移到其他健康的节点执行,避免任务丢失或重复执行。
6.节点选主:
在ElasticJob中,节点选主于确保全局任务协调、故障恢复、任务重新分配等关键功能的唯一性和高效性。它确保只有一个主节点负责这些重要任务,其他节点则专注于具体的分片任务执行。(重要!!!)
7. 任务调度流程总结:
- ElasticJob启动任务时,将任务分片分配给多个节点。
- 每个节点通过ZooKeeper争抢分片的锁,成功获取锁的节点可以独占执行该分片任务。
- 当节点失效时,ZooKeeper自动检测并释放该分片的锁,其他节点重新竞争锁,确保任务继续执行。
- 任务完成后,锁会被释放,任务的状态在ZooKeeper中被更新。
css复制代码节点A 节点B 节点C (节点)| | ||--- 分片1 ---| | (分片1由节点A执行)| |--- 分片2 ---| (分片2由节点B执行)| | |--- 分片3 (分片3由节点C执行)
- 每个分片在ZooKeeper中都有唯一的ZNode,节点通过ZooKeeper竞争锁。
ElasticJob通过ZooKeeper的分布式锁机制,确保任务分片在同一时间只能由一个节点执行,防止重复执行。当节点失效时,ZooKeeper能够检测并重新分配任务,保证任务的高可用性和正确性。这一机制确保了任务的唯一性执行和故障自动恢复。
学习链接:https://blog.csdn.net/u011066470/category_10201923.html
利用ElasticJob和ZooKeeper实现分布式任务调度管理,包括任务分片分配、分布式锁实现和节点选主,确保系统高效、稳定运行。
什么是分布式任务调度? 单体(发放优惠券 生成报表) —> 分布式(优惠券服务 结算服务)
在分布式系统环境下运行任务调度,就是分布式任务调度。
ElasticJob 教程
2.jdk提供的定时任务的实现方式
1.多线程方式 Runnable接口等
2.线程池方式 ScheduedExecutorService
3.Timer接口方式
4.Quartz方式(按照日历 cron表达式)
区别在于前面3个只能间隔执行,qurats可以间隔,也可以在某个时刻执行。
秒 分 时 天 月 周 年
3.分布式调度要实现的目标
1.并行的任务调度
2.高可用 :若某一个实例宕机,不影响其他实例来执行任务。
3.弹性扩容 :当集群中增加实例就可以提高并执行任务的处理效率。
4.任务管理与监测
5.避免任务重复执行:当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。
分布式锁:,多个实例在任务执行前首先需要获取锁,如果获取失败那么就证明有其他服务已经再运行,如果获取成功那么证明没有服务在运行定时任务,那么就可以执行。(上厕所的现实例子)
Zookeeper选举机制:利用ZooKeeper对Leader实例执行定时任务,有其他业务已经使用了ZK,那么执行定时任务的时候判断自己是否是Leader,如果不是则不执行,如果是则执行业务逻辑,这样也能达到我们的目的。(石头剪刀布的例子)
4.zookeeper
在学习Elastic-job执行原理时,有必要大致了解一下ZooKeeper是用来做什么的,因为:
1.Elastic-job依赖ZooKeeper完成对执行任务信息的存储;
2.咱们可以把ZooKeeper想象为一个特殊的数据库,它维护着一个类似文件系统的树形数据结构,ZooKeeper的客户端(如Elastic-Job任务执行实例)可以对数据进行存取:
zookeeper节点分类:持久化节点 持久化顺序节点 临时节点 临时顺序节点
实例选举实现过程分析:(重要!!!)
每个Elastic-job的任务执行实例作为ZooKeeper的客户端来操作ZooKeeper的znode
1)任意一个实例启动时首先创建一个 /server 的PERSISTENT节点(持久化节点)。
2)多个实例同时创建 /server/leader EPHEMERAL子节点(临时节点)。
3)/server/leader子节点只能创建一个,后创建的会失败。创建成功的实例被选为leader节点,用来执行任务。
4)所有任务实例监听 /server/leader 的变化,一旦节点被删除,就重新进行选举,抢占式地创建 /server/leader节点,谁创建成功谁就是leader。
5.作业分片
分片概念
作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或几个分片项。
例如:Elastic-job快速入门中文件备份的例子,现有2台服务器,每台服务器分别跑一个应用实例。为了快速的执行作业,那么可以将作业分成4片,每个应用实例个执行2片。作业遍历数据的逻辑应为:实例1查找text和image类型文件执行备份;实例2查找radio和vedio类型文件执行备份。如果由于服务器扩容应用实例数量增加为4,则作业遍历数据的逻辑应为:4个实例分别处理text、image、radio、tideo类型的文件。
可以看到,通过对任务合理的分片化,从而达到任务并行处理的效果,最大限度的提高执行作业的吞吐量。
分片项与业务处理解耦
Elastic-lob并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
最大限度利用资源
将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
**分片按照什么分片?**文件类型 文件编号(奇数偶数)
6.具体集成步骤
1.配置pom文件 引入elasticJob依赖
2.配置job执行类 实现SimpleJob接口重写execute方法
3.编写实体类
4.配置类@Configuration(zookeeper配置类 和 elasticJob配置类):
配置zookeeper注册中心 并 开启任务(开启前要配置 cron表达式 分片数量)
https://blog.csdn.net/u011066470/article/details/107441389
实习期间有哪些收获?
1.对git的使用更熟练,团队开发的流程一般是:从 release分支 new 一个 branch 在test环境,代码写完后测试没有问题,就需要提交并push到gitlab,发送一个merge请求,merge通过后会合并到test分支。然后使用Jenkins一键部署到k8s,测试环境没有问题才将自己的分支merge到release分支,然后在生产环境下一键部署。
上线后还需要和客户那边的开发人员进行线上联调,比如会议室对接的时候需要测试各种情况是否可行,保证预约一个新的会议室后,再预约同一个会议室 但时间有交集,这种情况是无法预约的;取消一个会议室后,该会议室该时间段进行解绑(空闲状态),保证其他人可以预约这个时间段的这个会议室等等。
2.实习期间做的最多的就是数据同步,基本上就是调用客户接口,比如组织人员同步,需要调用客户接口获取组织人员列表,将客户的组织人员转化为Moka这边的组织人员。或者是将moka这边的数据推送给客户,比如导入ehr,将待入职的人的word简历在moka系统进行导入ehr,实际上就是对word解析后的json转化为客户那边候选人信息的json格式,然后将候选人信息推送给客户系统。使我对Json字符串的处理更加熟练,我们使用的是阿里巴巴的fastjson,比如:导入ehr(候选人推送)需要获取moka这边的候选人json字符串里面的某些属性值。1.使用JSON.parseObject(data);将json字符串转为JSONObject对象,然后调用JSONObject的getString(“data”);方法获取内层的json字符串的值。2.也可以调用另外一个构造方法 JSON.parseObject(dataDTOString, DataDTO.class);传入json字符串和最终转化的实体类的class类,就能将json字符串转为java对象。
//候选人信息 json字符串 转 对象
JSONObject dataString = JSON.parseObject(data);
String dataDTOString = dataString.getString("data");
DataDTO dataDTO = JSON.parseObject(dataDTOString, DataDTO.class);
3.公司微服务使用的是Eureka作为注册中心,Apollo作为配置中心。在实习期间Apollo使用还是的比较多,公司会有staing测试环境和prod开发环境,Apollo上一般用来配置客户接口的一些鉴权相关的参数,客户接口域名 URL。我记得有一次上线后报错,最后就是发现Apollo上线后的一些配置还是测试环境下的,不管是鉴权还是域名都需要改成上线的值。PO对象Id字段Long类型踩坑实践
4.对日志的查看和使用更熟练,bug的排查,一般是写代码的时候对于一些返回结果(调用客户接口)或者数据转化后的json对象需要打印日志,使用@SLF4J的log.info()打印出运行时的一些结果,方便出现异常后好查看。公司使用的是阿里云的日志服务,通过日志全局搜索就能查看到指定日志内容。
瑞拓智联远程在线质量监控平台
参与开发四川中烟三联有限公司的瑞拓智联远程在线质量监控平台(监控中心),该平台实时监控测试台烟卷材料的各项检测指标,并在出现超标时自动报警,及时通知相关人员,确保质量问题能够迅速响应。
**技术栈:**SpringBoot、SpringSecurity、MyBatis-Plus、MySQL、Redis、MQTT、RabbitMQ、MagicAPI、Docker等
主要职责:
1.SpringSecurity + JWT双token
通过 SpringSecurity 集成 JWT,采用双 token 无感刷新机制优化登录认证与授权,提升系统安全性与用户体验.
为什么使用双token?
单token存在一些缺陷,过期时间设置太短比如30分钟,用户的体验就很差。过期时间设置太长,token就容易被盗取不安全。双token可以实现用户在access_token过期后,通过refresh_token来续期,用户体验好,同时刷新也保证了安全性(每次刷新会创建全新的token)。
讲一下实现过程?
用户首次登录时,我们将用户名密码封装到UsernamePasswordAuthenticationToken对象当中,其次需要实现UserDetailsService接口并重写loadUserByUsername根据用户名获取用户信息,最后我们会调用AuthenticationManager对象的authenticate方法并传入封装了用户名密码的UsernamePasswordAuthenticationToken对象,内部会比较前端传入的用户名密码和数据库查询的用户名密码是否一致。
如果一致就登陆成功,这个时候会将用户信息权限信息封装到一个LoginUser对象当中并缓存到Redis。同时根据用户id生成双token,access_token用于免登录的认证,refresh_token用于刷新过期时间,refresh_token缓存到redis。用户再次访问资源时会先携带access_token,拦截器会对请求进行拦截并判断access_token是否合法是否过期,过期的话就会通知前端,前端会携带refresh_token再次发送请求,refresh_token没有过期就会刷新双token的过期时间,采用的是先删除redis中refresh_token,然后根据JWT生成俩个全新的双token,将新的refresh_token缓存到redis并将新的双token响应给前端。
2.Redis缓存数据
实现基础数据模块,管理检测项、牌号、机台等,并结合 Redis 缓存优化,缓存命中率达 95%,响应时间缩短至 50ms
3.策略模式-多种样品检测方式
采用策略模式实现自动检测、手动检测、自定义检测等多种样品检测方式,提升代码的复用性和扩展性。
Moka多租户 会议室功能 采用策略模式
4.MQTT数据接收和远程控制
开发监控模块,支持表格、卡片、地图展示,基于 MQTT 协议实现测试台的远程控制和数据接收。使用 Eclipse Paho MQTT 库完成客户端连接、消息订阅与数据解析,通过 QoS 配置确保消息可靠传输,实时控制测试台的开关机、测量、标定等操作。
MQTT 的特点
为什么使用MQTT?
- 轻量化协议:
- 消息头部非常小,仅需 2 个字节,适合低带宽环境。
- 使用简化的消息格式,减少冗余信息,数据传输高效。
- 发布/订阅模型:
- 客户端通过代理订阅主题,灵活应对多对多通信。
- 消息通过代理转发,无需客户端直接请求服务器。
- 长连接:
- 基于 TCP 的长连接,在连接建立后可持续通信,减少了频繁握手和建立连接的开销。
- QoS(服务质量)保证:
- 提供三种服务质量级别(QoS 0、1、2),确保消息可靠传输。用户可以根据需求选择不同的可靠性级别。
- 低功耗:
- 适合电池供电设备,使用 Keep Alive 机制和长连接降低频繁通信的功耗。
- 高实时性:
- 由于保持长连接和异步通信,能够实时接收和发布数据,适合需要低延迟的场景。
场景:适合在物联网、远程控制**、实时监控**等场景中使用,特别是在低带宽、不稳定网络的情况下表现尤为突出。
5.报警模块-RabbitMQ
通过 RabbitMQ 实现报警模块,采用持久化和消息确认机制确保消息可靠性,并使用唯一消息 ID 实现幂等性,防止重复报警。
1. 需求背景
首先,简单介绍一下为什么需要开发报警模块:
- 需求:在我们的监控系统中,测试台需要对一些关键指标进行实时监控。当指标超出预设范围时,需要及时发出报警通知,确保相关人员能够迅速采取措施。
- 目的:为了实现这一需求,我们开发了一个报警模块,具备灵活的报警条件设置、报警等级设置,以及多种通知方式管理。
2. 技术选择与架构设计
接着,介绍你选择的技术栈和设计思路:
-
RabbitMQ 作为消息队列:RabbitMQ 被选中用来管理报警消息的传递。它支持消息的持久化和确认机制,能够确保每条报警消息的可靠传输,避免丢失。
-
报警模块的架构
:
- 生产者:当某个指标超出报警条件时,系统会作为生产者生成报警消息,并将消息发送至 RabbitMQ 队列。
- 消费者:消费者从队列中接收报警消息,处理后触发相应的报警通知(如短信、邮件或推送)。
- 报警条件和等级设置:我们为用户提供了灵活的报警条件和报警等级设置,不同的报警等级触发不同的处理流程和通知方式。
3. 具体实现细节
然后,深入解释你如何实现每个关键功能:
- 报警条件设置:用户可以通过后台界面设置不同的报警触发条件。比如,温度、压力等指标超过阈值时,系统会自动生成报警消息。
- 报警等级设置:报警等级分为警告、严重、紧急等,系统根据不同等级执行不同的响应策略。比如,严重级别的报警会通过多种方式通知关键人员。
- 通知方式管理:我们实现了多种通知方式(短信、邮件、应用内推送等),用户可以选择适合的方式来接收报警信息。
4. RabbitMQ 的使用与优化
详细解释如何使用 RabbitMQ 确保消息的可靠传输:
- 消息持久化:我们在生产者端启用了消息持久化,以确保即使 RabbitMQ 服务器出现问题,消息也不会丢失。
- 消息确认机制:消费者在处理完报警消息后会通过 ACK(消息确认机制)告知 RabbitMQ 该消息已成功处理。如果 RabbitMQ 没有收到确认,消息将重新放入队列中进行处理,确保消息的可靠性。
- 幂等性处理:为了防止重复报警,我们在每条消息中引入了唯一的消息 ID,并在处理时校验消息 ID,确保相同的消息不会被多次处理。
5. 性能优化与监控
强调你做出的优化工作:
- QoS 配置:我们使用了 RabbitMQ 的 QoS 配置,限制了每次消费者处理的消息数量,确保在高并发情况下系统不会过载,同时保证消息的可靠传输。
- 性能监控:通过系统日志和 RabbitMQ 自带的监控工具,我们能够实时跟踪消息的处理情况,确保消息处理的高效性和稳定性。
6. 结果与系统效果
最后,简单说明这个模块上线后的实际效果:
- 可靠性提升:通过持久化和消息确认机制,报警模块实现了高可靠的消息传输,避免了消息丢失或重复报警的问题。
- 系统的灵活性:用户可以根据具体需求灵活设置报警条件和等级,选择合适的通知方式,使报警模块能够更好地适应不同业务场景。