零、人在地球
一个成熟的生产环境会存在很多sql,测试环境不能完全复现出生产环境的情况。因此我们需要一些数据监控内容
一般用于监控:
①一些重点sql是否正常(某些极端数据导致查询异常)
②一些跑批任务的执行结果
③一些表是否有数据(比如按日更新监控增量)
全部的sql都可以监控,出现异常直接发送预警消息
一、代码
1、依赖
implementation 'org.springframework.boot:spring-boot-starter-quartz'
2、SQL表结构
T_JIANKONG 监控主表
CREATE TABLE T_JIANKONG(`id` char(32) NOT NULL COMMENT '主键',`childTaskName` varchar(64) NOT NULL COMMENT '子任务名称',`taskID` varchar(32) NOT NULL COMMENT '主任务ID 用户关联 推送、数据接收等需求的ID',`schema` varchar(255) DEFAULT NULL COMMENT '数据库名',`tableName` varchar(255) DEFAULT NULL COMMENT '表明,单表监控时填写',`cron` varchar(32) DEFAULT NULL COMMENT '执行时间',`executeSql` text COMMENT '执行sql',`excuteResult` text COMMENT '执行结果',`excuteTime` varchar(32) DEFAULT NULL COMMENT '执行时间',`principal` varchar(255) DEFAULT NULL COMMENT '推送地址:1,个人企微ID,2,企微群',`principalName` varchar(255) DEFAULT NULL COMMENT '负责人名称',`sendType` varchar(4) NOT NULL COMMENT '发送类型 1: 个人 2 发送群',`formula` varchar(255) DEFAULT NULL COMMENT '判断公式',`compareResult` varchar(255) DEFAULT NULL COMMENT '比较结果',`checkResult` varchar(32) DEFAULT NULL COMMENT '结果判断',`stateCode` varchar(100) DEFAULT '0' COMMENT '是否删除 0正常 1删除',PRIMARY KEY (`id`,`childTaskName`) USING BTREE,KEY `INX_TASKID` (`taskID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据监控表';
T_JIANKONG_LOG 监控日志表
CREATE TABLE T_JIANKONG_LOG(`id` char(32) NOT NULL COMMENT '主键',`childTaskName` varchar(64) DEFAULT NULL COMMENT '子任务名称',`excuteResult` text COMMENT '执行结果',`excuteTime` datetime DEFAULT NULL COMMENT '执行时间',`checkResult` varchar(32) DEFAULT NULL COMMENT '结果判断',`isSend` varchar(32) DEFAULT NULL COMMENT '是否发送告警信息',`stateCode` varchar(100) DEFAULT '0' COMMENT '是否删除 1正常 2删除',PRIMARY KEY (`id`) USING BTREE,KEY `INX_TASKID` (`childTaskName`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据监控历史表';
3、监控代码
主体类 EntityJob.java
public class EntityJob {private String id;private String cron;private String taskName;private String childTaskName;private String executeResult;private String executeSql;private String excuteTime;private String principal;private String sendType;private String formula;private String compareResult;private String checkResult;private String stateCode;private String principalName;private String schema;private String beanName;
}
监控服务Service DatamonitorService.java
@Service
public class DatamonitorService {public static final Logger logger = LoggerFactory.getLogger(DatamonitorService.class);@Resourceprivate SqlSession sqlSession;@Resourceprivate Scheduler scheduler ;private static final String DEFAULT_JOB_GROUP = "default_job_group";@PostConstructpublic void init(){Map<String,String> params = org.elasticsearch.common.collect.Map.of("stateCode","0");List<EntityJob> list = sqlSession.selectList("com.longze.gsh.dao.getDataMonitor",params);for(EntityJob m : list){createJob(m);}}public void createJob (EntityJob entityJob){try {if (!CronExpression.isValidExpression(entityJob.getCron())){XxlJobHelper.log("cron is invalid");return ;}String sql = entityJob.getExecuteSql().toUpperCase(Locale.ROOT);if(sql.contains("UPDATE") || sql.contains("DELETE") || sql.contains("DROP") || sql.contains("TRUNCATE") || sql.contains("INSERT") ){throw new SchedulerException("非法sql语句,只能使用select 语句");}if(!sql.contains("COUNT") && !sql.contains(" LIMIT ") ){throw new SchedulerException("数据监控只能使用count 语句");}// 构建任务JobDetail jobDetail = JobBuilder.newJob(QuartzJob.class).withIdentity(JobKey.jobKey(entityJob.getId(),DEFAULT_JOB_GROUP)).build() ;// 构建Cron调度器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(entityJob.getCron()).withMisfireHandlingInstructionDoNothing() ;// 不补偿// 任务触发器CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(entityJob.getId()).withSchedule(scheduleBuilder).build() ;jobDetail.getJobDataMap().put(entityJob.getId(),entityJob);scheduler.scheduleJob(jobDetail,trigger) ;logger.info("添加新任务:"+ entityJob.getChildTaskName());} catch (SchedulerException e){logger.error("添加任务失败,任务名称:"+entityJob.getTaskName()+" : "+ e.getMessage());}}public void updateJob(EntityJob entityJob) {try {// 查询触发器Keyif (!CronExpression.isValidExpression(entityJob.getCron())){XxlJobHelper.log("cron is invalid");throw new SchedulerException("cron is invalid");}//JobKey jobKey = getJobKey(entityJob);// 可能会更新 sql 和 cron 简单处理直接重建任务deleteJob(entityJob);createJob(entityJob);// TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());
// CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(entityJob.getCron());
// CronTrigger trigger = TriggerBuilder.newTrigger()
// .withIdentity(entityJob.getId())
// .withSchedule(cronScheduleBuilder)
// .build();
// scheduler.rescheduleJob(triggerKey, trigger);} catch (SchedulerException e) {XxlJobHelper.log("updateJob Fail,{}",e) ;}}public void deleteJob(EntityJob job) throws SchedulerException {JobKey key = getJobKey(job);scheduler.pauseJob(key);scheduler.unscheduleJob(TriggerKey.triggerKey(key.getName(), key.getGroup()));scheduler.deleteJob(key);}//获取JobDetail,JobDetail是任务的定义,而Job是任务的执行逻辑,JobDetail里会引用一个Job Class来定义public JobDetail getJobDetail(JobKey jobKey, String description, JobDataMap map) {return JobBuilder.newJob(QuartzJob.class).withIdentity(jobKey).withDescription(description).setJobData(map).storeDurably().build();}//获取Trigger (Job的触发器,执行规则)public Trigger getTrigger(String triggerID) {return TriggerBuilder.newTrigger().withIdentity(triggerID).build();}//获取JobKey,包含Name和Grouppublic JobKey getJobKey(EntityJob job) {return JobKey.jobKey(job.getId());}}
4、定时器类
public class QuartzJob implements Job {@Resourceprivate com.alibaba.druid.pool.DruidDataSource dataSource;@Autowiredprivate Map<String, MonitorProcessFactory> factoryMap;@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);String updateSql = "update T_JIANKONG SET excuteResult='%s',excuteTime=NOW(),checkResult='%s' where id='%s' ";String insertSql = "INSERT INTO `T_JIANKONG_LOG`(`id`, `childTaskName`, `excuteResult`, `excuteTime`, " +"`checkResult`, `isSend`, `stateCode`) VALUES (UUID_FACTORY(), '%s', '%s', NOW(), '%s', '%s', '0')";String content = "监控任务:%s \n" +"监控结果:%s \n" +"期待结果:%s \n" +"负责人:%s \n" +"执行时间:%s";JobDetail jobDetail = context.getJobDetail();JobKey jobKey = context.getTrigger().getJobKey();JobDataMap jobDataMap = jobDetail.getJobDataMap();EntityJob entityJob = (EntityJob) jobDataMap.get(jobKey.getName());XxlJobHelper.log("执行检测 :" + entityJob.getTaskName());long r = 0;String checkResult = "";String isSend = "0";MonitorProcessFactory processFactory = factoryMap.get(entityJob.getBeanName());if (!StringUtils.isEmpty(entityJob.getBeanName()) && null != processFactory) {checkResult = processFactory.check(entityJob) ? "1" : "0";}try {String excuteSql = entityJob.getExecuteSql().replaceAll("\\{schema}", entityJob.getSchema());r = jdbcTemplate.query(excuteSql, rs -> {if (rs.next()) {return rs.getLong(1);} else {return 0l;}});checkResult = check(entityJob.getFormula(), entityJob.getCompareResult(), r) ? "1" : "0";//发送预警if (checkResult.equals("0") && !StringUtils.isEmpty(entityJob.getPrincipal())) {if (entityJob.getSendType().equals("2")) {// 发送企微群RobotText robotText = new RobotText();robotText.setContent(String.format(content, entityJob.getChildTaskName(), r, entityJob.getFormula()+entityJob.getCompareResult(), entityJob.getPrincipalName(),DateUtil.formatDate(new Date(), DateUtil.DEFAULT_DATETIME_PATTERN)));robotPushService.sendRobotText("QuartzJob", entityJob.getPrincipal(), robotText, "1", null, null);} else {Text text = new Text();text.setContent(String.format(content, entityJob.getChildTaskName(), r, entityJob.getCompareResult(), entityJob.getPrincipalName(),DateUtil.formatDate(new Date(), DateUtil.DEFAULT_DATETIME_PATTERN)));wechatOfficService.sendTemplateText(entityJob.getPrincipal(), Arrays.asList(entityJob.getPrincipal()), text, entityJob.getChildTaskName(), null);}}} catch (Exception e) {// GET MSGcheckResult = "-1";} finally {//UPDATEString executeSql = String.format(updateSql, r, checkResult, entityJob.getId());jdbcTemplate.execute(executeSql);//插入历史记录表String executeSql2 = String.format(insertSql, entityJob.getChildTaskName(), r, checkResult, isSend);jdbcTemplate.execute(executeSql2);}}private boolean check(String formula, String value, long result) {if (formula.equals("等于")) {return Long.parseLong(value) == (result);}if (formula.equals("大于")) {return Long.parseLong(value) < (result);}if (formula.equals("小于")) {return Long.parseLong(value) > (result);}return true;}
}
5、比较方法工厂
可以根据设置的监控配置,设置自定义比较规则
MonitorCheckFactory.java
@Component
public interface MonitorCheckFactory {// 特殊判断处理方法Boolean check(EntityJob entityJob);}
Test1Monitor.java
@Service("Test1Monitor")
public class Test1Monitor implements MonitorProcessFactory {
// @Resource
// private DruidDataSource dataSource;@Overridepublic Boolean check(EntityJob entityJob) {
// String executeSql = entityJob.getExecuteSql().replaceAll("\\{schema}", entityJob.getSchema());
// JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
// jdbcTemplate.execute(executeSql);return null;}
}
6、每次启动项目,会加载所有配置中监控sql,如果在项目启动中修改了数据库中配置的sql,正在执行的监控不会及时更新,需要一个任务单独更改跑批
DataMonitorSchedule.java
@Component
public class DataMonitorSchedule {@Resourceprivate SqlSession sqlSession;@Resourceprivate DatamonitorService datamonitorService;@Resourceprivate com.alibaba.druid.pool.DruidDataSource dataSource;@XxlJob("updateDataMonitor")public void updateJob() {String childTaskName ="";if(!StringUtils.isEmpty(XxlJobHelper.getJobParam())){childTaskName = XxlJobHelper.getJobParam();}Map<String,String> params = org.elasticsearch.common.collect.Map.of("stateCode","0","childTaskName",childTaskName);List<EntityJob> list = sqlSession.selectList("com.longze.gsh.dao.getDataMonitor",params);for(EntityJob m : list){datamonitorService.updateJob(m);}}
}
7、sql mapper文件
<select id="getDataMonitor" resultType="com.longze.gsh.dataMonitor.EntityJob" parameterType="map">select * from T_JIANKONG where 1=1<if test="stateCode!=null and stateCode!=''">and stateCode=#{stateCode}</if><if test="taskID!=null and taskID!=''">and taskID = #{taskID}</if><if test="childTaskName!=null and childTaskName!=''">and childTaskName = #{childTaskName}</if></select>