一、前言
为什么要设计kafka告警方案?现成的监控项目百度一下一大堆,KafkaOffsetMonitor、KafkaManager、 Burrow等,具体参考:kafka的消息挤压监控。由于本小组的项目使用的kafka集群并没有被公司的kafka-manager管理,所以只能自己简单做一个告警。
二、告警方案
首先需要两个定时任务,之间的通信依靠延迟队列。
左边的定时任务按周期扫面配置Topic-Consumer列表,通过kafka api获取消费详情并判断消息积压量是否已经大于阈值,如果阈值校验失败则放入延迟队里。
右边的定时任务按照周期从延迟队列对应的真实队列中取出一个Topic-Consumer关系,再次进行一下阈值的校验,如果检验失败才发送告警短信。
三、准备工作
1、依赖配置中心
配置中心是实现告警方案的一个关键点,通过配置中心可以动态获取告警相关的属性配置,并刷新对应的 java bean。如下是告警对应的配置bean。
@ConfigCenterBean @ConfigurationProperties(prefix = "wmhcontrol.alarm") @Component public class AlarmConstants extends BaseConfigCenterBean {private static Logger LOGGER = LoggerFactory.getLogger(AlarmConstants.class);//告警电话号码 @ConfigFieldprivate String[] phones;//短信模板ID @ConfigFieldprivate String templateId;//延迟时间 @ConfigFieldprivate Integer delay = 600;//轮训时间 @ConfigFieldprivate Integer period = 60;//获取topic-consumer消费详情地址 @ConfigFieldprivate String tcsr;//查看topic-consumer消费详情地址 @ConfigFieldprivate String tclr;//全局统一阈值 @ConfigFieldprivate Integer threshold = 1000;//topic和consumer关系列表 @ConfigFieldprivate List<TCR> tcrs;@ToInitialprivate void refreshProperties() {try {super.doBind();LOGGER.info(String.format("%s 刷新成功..., 当前配置=%s...", this.getModuleName(), this));} catch (Exception e) {LOGGER.error("AlarmConstants 对象属性绑定失败...", e);}}private void toRefresh() {try {if (isCfgCenterEffect()) {ZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource();if (ConfigCenterUtils.propertySourceShouldRefresh(this.getModuleName(), propertySource)) {this.refreshProperties();}}} catch (Exception e) {LOGGER.error("AlarmConstants 对象属性刷新失败", e);}}@ToRefreshpublic Integer getThreshold() {return threshold;}public void setThreshold(Integer threshold) {this.threshold = threshold;}@ToRefreshpublic List<TCR> getTcrs() {return tcrs;}public void setTcrs(List<TCR> tcrs) {this.tcrs = tcrs;}@ToRefreshpublic String getTcsr() {return tcsr;}public void setTcsr(String tcsr) {this.tcsr = tcsr;}@ToRefreshpublic Integer getPeriod() {return period;}public void setPeriod(Integer period) {this.period = period;}@ToRefreshpublic Integer getDelay() {return delay;}public void setDelay(Integer delay) {this.delay = delay;}@ToRefreshpublic String[] getPhones() {return phones;}public void setPhones(String[] phones) {this.phones = phones;}@ToRefreshpublic String getTemplateId() {return templateId;}public void setTemplateId(String templateId) {this.templateId = templateId;}@ToRefreshpublic String getTclr() {return tclr;}public void setTclr(String tclr) {this.tclr = tclr;}@Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, AlarmConstants.class);}@Overridepublic String getDefaultResourcePath() {return "config/alarm.properties";}@Overridepublic String getConfigPrefix() {return "wmhcontrol.alarm";}@Overridepublic String getModuleName() {return "告警配置";}@Overridepublic void refreshForEvent() {this.refreshProperties();}/*** topic 和 consumer之间的关系实体*/public static class TCR {private String topic;private String consumer;private String channel;private Integer threshold;public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public String getConsumer() {return consumer;}public void setConsumer(String consumer) {this.consumer = consumer;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}public Integer getThreshold() {return threshold;}public void setThreshold(Integer threshold) {this.threshold = threshold;}@Overridepublic String toString() {return "TCR{" +"topic='" + topic + '\'' +", consumer='" + consumer + '\'' +", channel='" + channel + '\'' +", threshold=" + threshold +'}';}}public static class TopicConsumerDetail {private String group;private String topic;private Integer pid;private Long offset;private Long logsize;@Overridepublic String toString() {return "TopicConsumerDetail{" +"group='" + group + '\'' +", topic='" + topic + '\'' +", pid=" + pid +", offset=" + offset +", logsize=" + logsize +", lag=" + lag +", owner='" + owner + '\'' +'}';}private Long lag;private String owner;public String getGroup() {return group;}public void setGroup(String group) {this.group = group;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public Integer getPid() {return pid;}public void setPid(Integer pid) {this.pid = pid;}public Long getOffset() {return offset;}public void setOffset(Long offset) {this.offset = offset;}public Long getLogsize() {return logsize;}public void setLogsize(Long logsize) {this.logsize = logsize;}public Long getLag() {return lag;}public void setLag(Long lag) {this.lag = lag;}public String getOwner() {return owner;}public void setOwner(String owner) {this.owner = owner;}} }
告警有个全局统一的阈值,每一个topic可以指定不同的阈值。
配置中心 和 java bean建立关联请参考:依赖配置中心实现注有@ConfigurationProperties的bean相关属性刷新。
2、定时任务的周期性可动态配置
借助 org.springframework.scheduling.annotation.SchedulingConfigurer。
由@EnableScheduling注释的@Configuration类实现的可选接口。通常用于设置在执行计划任务时使用的特定TaskScheduler bean,或者用于以编程方式注册计划任务,而不是使用@Scheduled注释的声明性方法。例如,在实现基于触发器的任务时可能需要这样做,而@Scheduled注释不支持这些任务。
基本的代码轮廓如下。
@Configuration public class MessageCenterAlarmTask implements SchedulingConfigurer {@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {try {//每5s检测一下队列taskRegistrar.addFixedRateTask(() -> {}, 5 * 1000L);//动态修改定时任务周期taskRegistrar.addTriggerTask(() -> {}, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));} catch (Exception e) {LOGGER.error("消息中心topic-consumer定时任务初始化失败...", e);}} }
上面的代码中的定时任务分别表示了告警方案中右边和左边的定时任务。
3、延迟队列的实现
借助redisson分布式延迟队列 或者 java delayqueue + redistemplate 实现分布式延迟队列。
参考:Redisson分布式延迟队列官方文档
参考:Redisson DelayedQueue实现原理
Redisson的集群模式配置如下。
public class RedissonBuilder {private static Logger LOGGER = LoggerFactory.getLogger(RedissonBuilder.class);public static RedissonClient getRedisson(String cluster) {String[] nodes = cluster.split(",");for (int i = 0; i < nodes.length; i++) {nodes[i] = "redis://" + nodes[i];}Config config = new Config();config.useClusterServers() //这是用的集群server.setScanInterval(2000) //设置集群状态扫描时间.setConnectTimeout(2000).addNodeAddress(nodes);try {RedissonClient rc = Redisson.create(config);return rc;} catch (Exception e) {LOGGER.error("RedissonClient getRedisson errors...", e);return null;}} }@Configuration public class RedissonConfig {private static Logger LOGGER = LoggerFactory.getLogger(RedissonConfig.class);@Beanpublic RedissonClient redissonClient(@Value("${redisAddress}") String cacheAddress) {RedissonClient rc = RedissonBuilder.getRedisson(cacheAddress);try {if (!Objects.isNull(rc)) {LOGGER.info(rc.getConfig().toJSON());}} catch (IOException e) {LOGGER.error("RedissonConfig redissonClient errors... ", e);}return rc;}}
Redisson创建延迟队列方式
RQueue<AlarmConstants.TCR> topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");
RDelayedQueue<AlarmConstants.TCR> topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);
首先创建目标队列,然后通过目标队列拿到延迟队列。
4、kafka api返回数据处理
参考:简单封装kafka相关的api
更具topic和consumer可以拿到如下数据。其中Lag对应的这一列表示未消费的消息数量。
需要做的是把如上数据映射到 AlarmConstants.TopicConsumerDetail 这个java bean中,借助Spring BeanWrapperImpl,如下。
private static List<AlarmConstants.TopicConsumerDetail> retrieveDetail(String detailResponse) {List<AlarmConstants.TopicConsumerDetail> result = new ArrayList<>();try {Scanner scanner = new Scanner(detailResponse.replace("<pre>", StringUtils.EMPTY).replace("</pre>", StringUtils.EMPTY));String[] propertyNames = null;
//第一行对应java bean的field nameif (scanner.hasNextLine()) {String nameLine = scanner.nextLine();if (StringUtils.isBlank(nameLine)) {return result;}propertyNames = Arrays.stream(nameLine.split("\\s+")).map(propertyName -> propertyName.toLowerCase()).toArray(String[]::new);}
//剩余行对应java bean的field valuewhile (scanner.hasNextLine()) {AlarmConstants.TopicConsumerDetail tcd = new AlarmConstants.TopicConsumerDetail();BeanWrapper br = new BeanWrapperImpl(tcd);String valueLine = scanner.nextLine();if (StringUtils.isBlank(valueLine)) {continue;}String[] propertyValues = valueLine.split("\\s+");for (int i = 0; i < propertyValues.length; i++) { br.setPropertyValue(propertyNames[i], propertyValues[i]);}result.add(tcd);}LOGGER.info("消息中心提取topic-consumer详情信息:" + result);} catch (Exception e) {LOGGER.error("消息中心提取topic-consumer信息异常..." + detailResponse, e);}return result; }
处理之后的效果如下。
[TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=0, offset=10956087, logsize=10956091, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=1, offset=10950487, logsize=10950502, lag=15, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=2, offset=10958523, logsize=10958529, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=3, offset=10955709, logsize=10955717, lag=8, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=4, offset=10956550, logsize=10956563, lag=13, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=5, offset=10956343, logsize=10956347, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=6, offset=10954124, logsize=10954128, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=7, offset=10949075, logsize=10949082, lag=7, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=8, offset=10963839, logsize=10963843, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=9, offset=10958536, logsize=10958540, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=10, offset=10955316, logsize=10955327, lag=11, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=11, offset=10957850, logsize=10957856, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=12, offset=10954508, logsize=10954515, lag=7, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=13, offset=10960468, logsize=10960477, lag=9, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=14, offset=10955540, logsize=10955544, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}]
四、告警逻辑
1、短息发送
private String toShortMessage(AlarmConstants.TCR tcr, Long lag) {JSONObject info = new JSONObject();StringBuilder text = new StringBuilder();String messageDate = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);text.append("【Topic-Consumer阈值告警 " + messageDate + "】\r\n");text.append("\t渠道: " + tcr.getChannel() + "\r\n");text.append("\t主题: " + tcr.getTopic() + "\r\n");text.append("\t消费: " + tcr.getConsumer() + "\r\n");text.append("\t阈值: " + (Objects.isNull(tcr.getThreshold()) ? alarmConstants.getThreshold() : tcr.getThreshold()) + "\r\n");text.append("\t堆积: " + lag + "\r\n");try {String refUrl = alarmConstants.getTclr() + "?topic=" + tcr.getTopic() + "&group=" + tcr.getConsumer();JSONObject params = new JSONObject();params.put("url", refUrl);String shortUrlResponse = HttpClient.post("https://dwz.cn/admin/create", params.toJSONString(), "application/json");LOGGER.info("获取短链接返回内容:" + shortUrlResponse);if (StringUtils.isNotBlank(shortUrlResponse)) {JSONObject shortUrlJson = JSON.parseObject(shortUrlResponse);Integer code = (Integer) FastJsonUtils.search(shortUrlJson, "Code");if (Integer.valueOf(0).equals(code)) {String shortUrl = (String) FastJsonUtils.search(shortUrlJson, "ShortUrl");if (StringUtils.isNotBlank(shortUrl)) {text.append("\t查看: " + shortUrl + "\r\n");}}}} catch (Exception e) {LOGGER.error("短链接生成异常...", e);}info.put("txt_name", "消息中心");info.put("txt_result", text.toString());return info.toJSONString(); }
2、阈值校验
private Long thresholdCheck(AlarmConstants.TCR tcr) {String detailUrl = alarmConstants.getTcsr() + "?topic=" + tcr.getTopic() + "&group=" + tcr.getConsumer();String detailResponseStr = HttpClient.get(detailUrl);LOGGER.info(detailUrl + " " + detailResponseStr);List<AlarmConstants.TopicConsumerDetail> detailResponse = retrieveDetail(detailResponseStr);if (CollectionUtils.isEmpty(detailResponse)) {return -1L;}Long lag = detailResponse.stream().mapToLong(AlarmConstants.TopicConsumerDetail::getLag).sum();Long threshold = Long.valueOf(Objects.isNull(tcr.getThreshold()) ? alarmConstants.getThreshold() : tcr.getThreshold());if (lag.compareTo(threshold) > 0) {return lag;}return -1L; }
3、两个定时任务逻辑补充
@Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {try {RQueue<AlarmConstants.TCR> topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");RDelayedQueue<AlarmConstants.TCR> topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);//每5s检测一下队列taskRegistrar.addFixedRateTask(() -> {AlarmConstants.TCR tcr = topicConsumerQueue.poll();if (!Objects.isNull(tcr)) {//发送告警信息Long lag = thresholdCheck(tcr);if (lag > 0) {if (ArrayUtils.isNotEmpty(alarmConstants.getPhones())) {String message = toShortMessage(tcr, lag);String tmplateId = alarmConstants.getTemplateId();LOGGER.info("消息中心告警短信内容:" + message);for (String phone : alarmConstants.getPhones()) {try {MessageUtils.sendMessage(phone, messageUrl, message, tmplateId);} catch (Exception e) {LOGGER.error(String.format("消息中心告警短信发送异常...%s %s %s", phone, messageUrl, message), e);}}}}}}, 5 * 1000L);taskRegistrar.addTriggerTask(() -> {RLock lock = null;try {lock = redissonClient.getLock(commonRedisKey + "TopicConsumerForEach");// 尝试加锁,最多等待5秒,上锁以后5秒自动解锁if (!lock.tryLock(5, 5, TimeUnit.SECONDS)) {return;}if (!CollectionUtils.isEmpty(alarmConstants.getTcrs())) {alarmConstants.getTcrs().stream().filter(tcr -> !topicConsumerDelayedQueue.contains(tcr) && (thresholdCheck(tcr) > 0)).forEach(tcr -> topicConsumerDelayedQueue.offer(tcr, alarmConstants.getDelay(), TimeUnit.SECONDS));}} catch (Exception e) {LOGGER.error("消息中心topic-consumer定时任务执行失败...", e);} finally {if (!Objects.isNull(lock)) {lock.unlock();}} //动态周期性检测Topic-Consumer阈值}, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));} catch (Exception e) {LOGGER.error("消息中心topic-consumer定时任务初始化失败...", e);} }
五、告警定时任务源码
请关注订阅号(算法和技术SHARING),回复:kafka告警, 便可查看。