资源&版本信息
Flink版本1.14.6
运行平台:K8s
HA使用ZK(使用K8s的ETC应该是一个道理)
详解Flink HA原理
Flink启动时会创建HighAvailabilityServices提供HA和相关基础服务,其中包括leaderRetrievalService和LeaderElectionService服务;
-
leaderRetrievalService用于高可用组件的调用方获得leader节点,例如在JobManager中通过ResourceManagerLeaderRetriever服务获取ResourceManager的Leader节点;
-
LeaderElectionService用于主节点竞选,一旦当前组件被选为Leader节点,就可以对外提供服务,leaderRetrievalService就能够获取已注册且有效的Leader节点;
LeaderRetrievalService讲解
DefaultLeaderRetrievalService类结构如下
-
LeaderRetrievalService默认实现类DefaultLeaderRetrievalService;DefaultLeaderRetrievalService还实现了LeaderRetrievalEventHandler接口(该接口只有notifyLeaderAddress方法,用于状态变化时被回调);
-
DefaultLeaderRetrievalService中notifyLeaderAddress方法会判断当前是否处于运行状态,然后调用leaderListener.notifyLeaderAddress方法通知监听器leader变更!
-
DefaultLeaderRetrievalService.leaderListener是LeaderRetrievalListener一种实现JobManagerLeaderListener,用于TaskManager监听Jobmanager变更的实现类,实现在jobmanager变更时及时修改连接信息。
如何实现HA变更时发送告警信息了?
JobManager宕机重启或ZK不可用后恢复,此时肯定会发生HA切换,其次根据代码观察每次HA切换必会导致leaderId变化(每次连接),根据上述背景知识逐个情况分析。
JobManager宕机:
根据日志观察leaderListener.notifyLeaderAddress方法会被调用两次,第一次是将leaderId地址设置为空,在JobManager启动并选举为leader后,notifyLeaderAddress会被再次调用将leaderId设置为最新的leaderId;
ZK不可用:
根据日志观察leaderListener.notifyLeaderAddress方法会被调用三次,假设leaderId原先是A,先被设置为null,然后被设置为A,再被设置为B;
结论:
根据上述情况,我们可以在leaderListener.notifyLeaderAddress方法中记录每次的leaderId的值,当该值发生变化时,变为null或者由A变成B时发送HA变更告警即可;为了更加精准,选择leaderId在A变成B时,或者leaderId在null变成B时发送告警
如何在leaderListener.notifyLeaderAddress方法中将告警发出了?
使用java agent,在flink启动时设置agent即可(-javaagent:agent.jar=123
其中123是入参);具体代码插入点可选择方法进入时或同步块中
@Override
public void notifyLeaderAddress(@Nullable final String leaderAddress, @Nullable final UUID leaderId) {Optional<JobMasterId> jobManagerLostLeadership = Optional.empty();synchronized (lock) {if (stopped) {LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. "+ "However, the service is no longer running.",DefaultJobLeaderService.class.getSimpleName(),jobId);} else {final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",jobId,leaderAddress,jobMasterId);if (leaderAddress == null || leaderAddress.isEmpty()) {// the leader lost leadership but there is no other leader yet.jobManagerLostLeadership = Optional.ofNullable(currentJobMasterId);closeRpcConnection();} else {// check whether we are already connecting to this leaderif (Objects.equals(jobMasterId, currentJobMasterId)) {LOG.debug("Ongoing attempt to connect to leader of job {}. Ignoring duplicate leader information.",jobId);} else {closeRpcConnection();openRpcConnectionTo(leaderAddress, jobMasterId);}}}}// send callbacks outside of the lock scopejobManagerLostLeadership.ifPresent(oldJobMasterId ->jobLeaderListener.jobManagerLostLeadership(jobId, oldJobMasterId));
}
最终实现待更新~