使用Python基于metricbeat和heartbeat采集数据进行告警

一、系统架构

IP主机名角色备注
11.0.1.11kafka1kafka和MySQL
11.0.1.12kafka2kafka
11.0.1.13kafka3kafka
11.0.1.14demo1metricbeat和heartbeat

二、部署Kafka
省略

二、部署Metricbeat和Heartbeat
metricbeat配置:

metricbeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: falsefields:ip: 11.0.1.14output.kafka:hosts: ["11.0.1.11:9092","11.0.1.12:9092","11.0.1.13:9092"]topic: "ELK-metricbeat"

heartbeat配置:

heartbeat.config.monitors:path: ${path.config}/monitors.d/*.ymlreload.enabled: falsereload.period: 5s# ----------------------------  Kafka Output ----------------------------
output.kafka:hosts: ["11.0.1.11:9092","11.0.1.12:9092","11.0.1.13:9092"]topic: "ELK-heartbeat"

heartbeat的tcp.yml配置:

- type: tcp id: my-tcp-monitorname: My TCP monitorenabled: trueschedule: '@every 20s' hosts: ["11.0.1.14:80","11.0.1.13:80","11.0.1.12:80"]ipv4: trueipv6: truemode: all

三、MariaDB表结构
cmdb_app表(存储应用系统的信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for cmdb_app
-- ----------------------------
DROP TABLE IF EXISTS `cmdb_app`;
CREATE TABLE `cmdb_app`  (`id` int(11) NOT NULL AUTO_INCREMENT,`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_dep` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;SET FOREIGN_KEY_CHECKS = 1;

解释:
app_name:系统名称
ops_user:运维人员姓名
ops_tel:运维人员手机号
ops_dep:运维责任部门

cmdb_os表(存储服务器信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for cmdb_os
-- ----------------------------
DROP TABLE IF EXISTS `cmdb_os`;
CREATE TABLE `cmdb_os`  (`id` int(11) NOT NULL AUTO_INCREMENT,`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`eip` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`module` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;SET FOREIGN_KEY_CHECKS = 1;

解释:
app_name:系统信息
eip:服务器IP
module:服务器用途

alert_list表(存储告警信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for alert_list
-- ----------------------------
DROP TABLE IF EXISTS `alert_list`;
CREATE TABLE `alert_list`  (`id` int(11) NOT NULL AUTO_INCREMENT,`timestamp` datetime NULL DEFAULT NULL,`url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`status` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_dep` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`module` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;SET FOREIGN_KEY_CHECKS = 1;

四、使用Python程序,从Kafka读取数据,并将cmdb_os和cmdb_app信息根据kafka数据中的ip信息匹配起来,并将新的数据写入到新的Kafka

安装依赖:

pip install kafka-python pymysql apscheduler pyyaml

先说metricbeat_replace.py:

import json
import logging
from logging.handlers import RotatingFileHandler
import pymysql
from kafka import KafkaConsumer, KafkaProducer
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTriggerclass DatabaseConnectionError(Exception):def __init__(self, message="数据库连接失败"):self.message = messagesuper().__init__(self.message)class KafkaCMDBProcessor:def __init__(self, kafka_config, mysql_config):self.kafka_config = kafka_configself.mysql_config = mysql_configself.logger = self.setup_logger()self.cmdb_data = None# 初始化调度器self.scheduler = BackgroundScheduler()self.scheduler.start()# 添加定时任务,每个整点、10分、20分、30分、40分、50分的时候执行self.scheduler.add_job(self.load_cmdb_data, CronTrigger(minute='0,10,20,30,40,50'))@staticmethoddef setup_logger():logger = logging.getLogger(__name__)logger.setLevel(logging.DEBUG)# 创建控制台处理程序并设置级别为调试ch = logging.StreamHandler()ch.setLevel(logging.DEBUG)# 创建文件处理程序并设置级别为调试,最大文件大小为1 MB,保留备份文件3个fh = RotatingFileHandler('metricbeat_replace.log', maxBytes=1e6, backupCount=3)fh.setLevel(logging.DEBUG)# 创建格式化器formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')# 将格式化器添加到处理程序ch.setFormatter(formatter)fh.setFormatter(formatter)# 将处理程序添加到记录器logger.addHandler(ch)logger.addHandler(fh)return loggerdef start_processing(self):self.connect_to_database()  # 初始化时第一次连接数据库self.load_cmdb_data()  # 初始化时加载数据到内存self.logger.info("开始处理...")consumer = KafkaConsumer(self.kafka_config['input_topic'],group_id=self.kafka_config['consumer_group_id'],bootstrap_servers=self.kafka_config['bootstrap_servers'],auto_offset_reset='earliest')self.logger.info("Kafka 消费者已创建.")producer = KafkaProducer(bootstrap_servers=self.kafka_config['bootstrap_servers'])self.logger.info("Kafka 生产者已创建.")try:for msg in consumer:metricbeat_data = msg.value.decode('utf-8')ip = self.extract_ip(metricbeat_data)cmdb_data = self.get_cmdb_data(ip)self.process_and_send_message(producer, metricbeat_data, cmdb_data)except KeyboardInterrupt:self.logger.info("接收到 KeyboardInterrupt。正在优雅地关闭。")except Exception as e:self.logger.error(f"发生错误:{str(e)}")# 如果在处理过程中发生异常,可以在这里添加适当的处理逻辑finally:consumer.close()producer.close()def connect_to_database(self):try:self.logger.info("正在连接数据库...")db = pymysql.connect(host=self.mysql_config['host'],port=self.mysql_config['port'],user=self.mysql_config['user'],password=self.mysql_config['password'],database=self.mysql_config['db'])self.logger.info("数据库连接成功.")self.db_connection_error_logged = False  # 连接成功后重置连接错误标志except pymysql.Error as e:error_message = f"连接数据库时发生错误:{str(e)}"self.logger.error(error_message.split('\n')[0])raise DatabaseConnectionError(error_message) from efinally:if db:db.close()def load_cmdb_data(self):db = Nonecursor = Nonetry:self.logger.info("开始加载数据.")db = pymysql.connect(host=self.mysql_config['host'],port=self.mysql_config['port'],user=self.mysql_config['user'],password=self.mysql_config['password'],database=self.mysql_config['db'])cursor = db.cursor()# 查询 cmdb_os 表中的数据sql_cmdb_os = "SELECT app_name, eip, module FROM cmdb_os"cursor.execute(sql_cmdb_os)cmdb_os_result = cursor.fetchall()# 查询 cmdb_app 表中的数据sql_cmdb_app = "SELECT app_name, ops_user, ops_tel, ops_dep FROM cmdb_app"cursor.execute(sql_cmdb_app)cmdb_app_result = cursor.fetchall()# 将数据保存到内存中self.cmdb_data = {"cmdb_os": cmdb_os_result,"cmdb_app": cmdb_app_result}self.logger.info("数据加载完成.")except pymysql.Error as e:error_message = f"加载数据时发生数据库错误:{str(e)}"self.logger.error(error_message.split('\n')[0])self.logger.warning("数据库连接异常,继续沿用上次的 CMDB 数据.")finally:if cursor:cursor.close()if db:db.close()@staticmethoddef extract_ip(metricbeat_data):data = json.loads(metricbeat_data)return data.get('fields', {}).get('ip', '')def get_cmdb_data(self, ip):if self.cmdb_data:# 在内存中查找数据cmdb_os_data = [row for row in self.cmdb_data["cmdb_os"] if row[1] == ip]cmdb_app_data = [row for row in self.cmdb_data["cmdb_app"] if row[0] == cmdb_os_data[0][0]]return cmdb_os_data, cmdb_app_dataelse:return Nonedef process_and_send_message(self, producer, original_data, cmdb_data):original_data_str = original_data.decode('utf-8') if isinstance(original_data, bytes) else original_datanew_message = json.loads(original_data_str)if cmdb_data:cmdb_os_data, cmdb_app_data = cmdb_datanew_message["cmdb_data"] = {"app_name": cmdb_os_data[0][0],"eip": cmdb_os_data[0][1],"module": cmdb_os_data[0][2],"ops_user": cmdb_app_data[0][1],"ops_tel": cmdb_app_data[0][2],"ops_dep": cmdb_app_data[0][3]}else:new_message["cmdb_data"] = Noneproducer.send(self.kafka_config['output_topic'], value=json.dumps(new_message, ensure_ascii=False).encode('utf-8'))producer.flush()if __name__ == "__main__":try:with open('application.yml', 'r') as config_file:config_data = yaml.safe_load(config_file)kafka_config_data = config_data.get('kafka', {})mysql_config_data = config_data.get('mysql', {})processor = KafkaCMDBProcessor(kafka_config_data, mysql_config_data)processor.start_processing()except FileNotFoundError:print("错误:找不到配置文件 'application.yml'。")except Exception as e:print(f"发生意外错误:{str(e)}")

application.yml配置如下:

kafka:bootstrap_servers:- '11.0.1.11:9092'- '11.0.1.12:9092'- '11.0.1.13:9092'consumer_group_id: 'metricbeat_replace'input_topic: 'ELK-metricbeat'output_topic: 'ELK-system_metricbeat'mysql:host: '11.0.1.11'port: 13306user: 'root'password: '123456'db: 'zll_python_test'

处理后的数据如下:

{"@timestamp": "2024-01-20T14:02:34.706Z", "@metadata": {"beat": "metricbeat", "type": "_doc", "version": "8.11.1"}, "host": {"name": "demo1"}, "agent": {"type": "metricbeat", "version": "8.11.1", "ephemeral_id": "979b3ab7-80af-4ab5-a552-3692165b7000", "id": "982d0bd1-d0d9-45b5-bc78-0a5f25911c12", "name": "demo1"}, "metricset": {"name": "memory", "period": 10000}, "event": {"module": "system", "duration": 120280, "dataset": "system.memory"}, "service": {"type": "system"}, "system": {"memory": {"used": {"pct": 0.2325, "bytes": 919130112}, "free": 3034763264, "cached": 529936384, "actual": {"used": {"pct": 0.1493, "bytes": 590319616}, "free": 3363573760}, "swap": {"total": 2147479552, "used": {"bytes": 0, "pct": 0}, "free": 2147479552}, "total": 3953893376}}, "fields": {"ip": "11.0.1.14"}, "ecs": {"version": "8.0.0"}, "cmdb_data": {"app_name": "应用系统", "eip": "11.0.1.14", "module": "demo1", "ops_user": "运维仔", "ops_tel": "12345678901", "ops_dep": "运维部"}}

heartbeat_replace.py如下:

import json
import logging
from logging.handlers import RotatingFileHandler
import pymysql
from kafka import KafkaConsumer, KafkaProducer
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTriggerclass DatabaseConnectionError(Exception):def __init__(self, message="数据库连接失败"):self.message = messagesuper().__init__(self.message)class KafkaCMDBProcessor:def __init__(self, kafka_config, mysql_config):self.kafka_config = kafka_configself.mysql_config = mysql_configself.logger = self.setup_logger()self.cmdb_data = None# 初始化调度器self.scheduler = BackgroundScheduler()self.scheduler.start()# 添加定时任务,每个整点、10分、20分、30分、40分、50分的时候执行self.scheduler.add_job(self.load_cmdb_data, CronTrigger(minute='0,10,20,30,40,50'))@staticmethoddef setup_logger():logger = logging.getLogger(__name__)logger.setLevel(logging.DEBUG)# 创建控制台处理程序并设置级别为调试ch = logging.StreamHandler()ch.setLevel(logging.DEBUG)# 创建文件处理程序并设置级别为调试,最大文件大小为1 MB,保留备份文件3个fh = RotatingFileHandler('heartbeat_replace.log', maxBytes=1e6, backupCount=3)fh.setLevel(logging.DEBUG)# 创建格式化器formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')# 将格式化器添加到处理程序ch.setFormatter(formatter)fh.setFormatter(formatter)# 将处理程序添加到记录器logger.addHandler(ch)logger.addHandler(fh)return loggerdef start_processing(self):self.connect_to_database()  # 初始化时第一次连接数据库self.load_cmdb_data()  # 初始化时加载数据到内存self.logger.info("开始处理...")consumer = KafkaConsumer(self.kafka_config['input_topic'],group_id=self.kafka_config['consumer_group_id'],bootstrap_servers=self.kafka_config['bootstrap_servers'],auto_offset_reset='earliest')self.logger.info("Kafka 消费者已创建.")producer = KafkaProducer(bootstrap_servers=self.kafka_config['bootstrap_servers'])self.logger.info("Kafka 生产者已创建.")try:for msg in consumer:heartbeat_data = msg.value.decode('utf-8')ip = self.extract_url_domain(heartbeat_data)cmdb_data = self.get_cmdb_data(ip)self.process_and_send_message(producer, heartbeat_data, cmdb_data)except KeyboardInterrupt:self.logger.info("接收到 KeyboardInterrupt。正在优雅地关闭。")except Exception as e:self.logger.error(f"发生错误:{str(e)}")# 如果在处理过程中发生异常,可以在这里添加适当的处理逻辑finally:consumer.close()producer.close()def connect_to_database(self):try:self.logger.info("正在连接数据库...")db = pymysql.connect(host=self.mysql_config['host'],port=self.mysql_config['port'],user=self.mysql_config['user'],password=self.mysql_config['password'],database=self.mysql_config['db'])self.logger.info("数据库连接成功.")self.db_connection_error_logged = False  # 连接成功后重置连接错误标志except pymysql.Error as e:error_message = f"连接数据库时发生错误:{str(e)}"self.logger.error(error_message.split('\n')[0])raise DatabaseConnectionError(error_message) from efinally:if db:db.close()def load_cmdb_data(self):db = Nonecursor = Nonetry:self.logger.info("开始加载数据.")db = pymysql.connect(host=self.mysql_config['host'],port=self.mysql_config['port'],user=self.mysql_config['user'],password=self.mysql_config['password'],database=self.mysql_config['db'])cursor = db.cursor()# 查询 cmdb_os 表中的数据sql_cmdb_os = "SELECT app_name, eip, module FROM cmdb_os"cursor.execute(sql_cmdb_os)cmdb_os_result = cursor.fetchall()# 查询 cmdb_app 表中的数据sql_cmdb_app = "SELECT app_name, ops_user, ops_tel, ops_dep FROM cmdb_app"cursor.execute(sql_cmdb_app)cmdb_app_result = cursor.fetchall()# 将数据保存到内存中self.cmdb_data = {"cmdb_os": cmdb_os_result,"cmdb_app": cmdb_app_result}self.logger.info("数据加载完成.")except pymysql.Error as e:error_message = f"加载数据时发生数据库错误:{str(e)}"self.logger.error(error_message.split('\n')[0])self.logger.warning("数据库连接异常,继续沿用上次的 CMDB 数据.")finally:if cursor:cursor.close()if db:db.close()@staticmethoddef extract_url_domain(heartbeat_data):data = json.loads(heartbeat_data)return data.get('url', {}).get('domain', '')def get_cmdb_data(self, ip):if self.cmdb_data:# 在内存中查找数据cmdb_os_data = [row for row in self.cmdb_data["cmdb_os"] if row[1] == ip]cmdb_app_data = [row for row in self.cmdb_data["cmdb_app"] if row[0] == cmdb_os_data[0][0]]return cmdb_os_data, cmdb_app_dataelse:return Nonedef process_and_send_message(self, producer, original_data, cmdb_data):original_data_str = original_data.decode('utf-8') if isinstance(original_data, bytes) else original_datanew_message = json.loads(original_data_str)if cmdb_data:cmdb_os_data, cmdb_app_data = cmdb_datanew_message["cmdb_data"] = {"app_name": cmdb_os_data[0][0],"eip": cmdb_os_data[0][1],"module": cmdb_os_data[0][2],"ops_user": cmdb_app_data[0][1],"ops_tel": cmdb_app_data[0][2],"ops_dep": cmdb_app_data[0][3]}else:new_message["cmdb_data"] = Noneproducer.send(self.kafka_config['output_topic'], value=json.dumps(new_message, ensure_ascii=False).encode('utf-8'))producer.flush()if __name__ == "__main__":try:with open('application.yml', 'r') as config_file:config_data = yaml.safe_load(config_file)kafka_config_data = config_data.get('kafka', {})mysql_config_data = config_data.get('mysql', {})processor = KafkaCMDBProcessor(kafka_config_data, mysql_config_data)processor.start_processing()except FileNotFoundError:print("错误:找不到配置文件 'application.yml'。")except Exception as e:print(f"发生意外错误:{str(e)}")

application.yml配置如下:

kafka:bootstrap_servers:- '11.0.1.11:9092'- '11.0.1.12:9092'- '11.0.1.13:9092'consumer_group_id: 'heartbeat_replace'input_topic: 'ELK-heartbeat'output_topic: 'ELK-system_heartbeat'mysql:host: '11.0.1.11'port: 13306user: 'root'password: '123456'db: 'zll_python_test'

处理后的数据如下:

{"@timestamp": "2024-01-20T14:03:37.102Z", "@metadata": {"beat": "heartbeat", "type": "_doc", "version": "8.11.1"}, "monitor": {"name": "My ICMP Monitor", "type": "icmp", "id": "my-icmp-monitor", "status": "up", "check_group": "b4caac6d-b79c-11ee-bf86-000c29a1adec-1", "duration": {"us": 131}, "ip": "11.0.1.14", "timespan": {"gte": "2024-01-20T14:03:37.102Z", "lt": "2024-01-20T14:03:53.102Z"}}, "url": {"domain": "11.0.1.14", "full": "icmp://11.0.1.14", "scheme": "icmp"}, "fields": {"nodename": "demo1"}, "summary": {"retry_group": "b4caac6d-b79c-11ee-bf86-000c29a1adec", "attempt": 1, "max_attempts": 1, "final_attempt": true, "up": 1, "down": 0, "status": "up"}, "state": {"id": "default-18d1d73022a-0", "up": 32661, "down": 0, "ends": null, "started_at": "2024-01-19T00:41:32.970059265+08:00", "duration_ms": "163324132", "status": "up", "checks": 32661, "flap_history": []}, "event": {"type": "heartbeat/summary", "dataset": "icmp"}, "icmp": {"requests": 1, "rtt": {"us": 83}}, "ecs": {"version": "8.0.0"}, "agent": {"name": "demo1", "type": "heartbeat", "version": "8.11.1", "ephemeral_id": "46819a45-3552-4e57-91f3-e58ffb12c72a", "id": "d56462aa-6f6b-4237-8bfc-a93c7bf933f4"}, "cmdb_data": {"app_name": "应用系统", "eip": "11.0.1.14", "module": "demo1", "ops_user": "运维仔", "ops_tel": "12345678901", "ops_dep": "运维部"}}

总的来说,metricbeat_heartbeat和heartbeat_replace代码基本一致,只是个别地方heartbeat换成metricbeat,return data.get(‘fields’, {}).get(‘ip’, ‘’)和return data.get(‘url’, {}).get(‘domain’, ‘’)的差别而已

五、heartbeat告警
heartbeat_alarm.py如下:

# heartbeat_alarm.pyimport json
import logging
import mysql.connector
from collections import defaultdict
from datetime import datetime, timedelta
from kafka import KafkaConsumer
import yaml# 配置日志记录器
logging.basicConfig(level=logging.INFO,filename='heartbeat_checker.log',format='%(asctime)s [%(levelname)s] - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)class HeartbeatChecker:def __init__(self, config_path='application.yml'):# 初始化 HeartbeatChecker 对象self.config_path = config_pathself.kafka_bootstrap_servers = Noneself.kafka_group_id = Noneself.kafka_topic = Noneself.mysql_host = Noneself.mysql_port = Noneself.mysql_user = Noneself.mysql_password = Noneself.mysql_database = Noneself.consecutive_down_threshold = Noneself.consecutive_up_threshold = None# 从 YAML 文件加载配置self.load_config()self.kafka_consumer = Nonedef load_config(self):try:# 从 YAML 文件加载配置with open(self.config_path, 'r') as file:config = yaml.safe_load(file)# 提取 Kafka 配置self.kafka_bootstrap_servers = config['kafka']['bootstrap_servers']self.kafka_group_id = config['kafka']['group_id']self.kafka_topic = config['kafka']['topic']# 提取 MySQL 配置self.mysql_host = config['mysql']['host']self.mysql_port = config['mysql']['port']self.mysql_user = config['mysql']['user']self.mysql_password = config['mysql']['password']self.mysql_database = config['mysql']['database']# 提取连续 down 和连续 up 的阈值self.consecutive_down_threshold = config['thresholds']['consecutive_down']self.consecutive_up_threshold = config['thresholds']['consecutive_up']except Exception as e:# 处理配置加载错误logger.error(f"加载配置时发生错误: {e}")raisedef create_kafka_consumer(self):try:# 创建 Kafka Consumer 实例self.kafka_consumer = KafkaConsumer(self.kafka_topic,bootstrap_servers=self.kafka_bootstrap_servers,group_id=self.kafka_group_id,auto_offset_reset='latest',enable_auto_commit=True,value_deserializer=lambda x: json.loads(x.decode('utf-8')))except Exception as e:# 处理创建 Kafka Consumer 错误logger.error(f"创建 Kafka Consumer 时发生错误: {e}")raisedef check_heartbeat_alerts(self):# 初始化 defaultdict 以存储每个 URL 的监测状态列表url_groups = defaultdict(list)mysql_connection = Nonetry:# 创建 Kafka Consumer 并连接到 MySQL 数据库self.create_kafka_consumer()mysql_connection = mysql.connector.connect(host=self.mysql_host,port=self.mysql_port,user=self.mysql_user,password=self.mysql_password,database=self.mysql_database)mysql_cursor = mysql_connection.cursor()# 遍历 Kafka 消息for message in self.kafka_consumer:json_data = message.valueurl = json_data.get('url', {}).get('full')monitor_status = json_data.get('monitor', {}).get('status')timestamp_str = json_data.get('@timestamp')cmdb_data = json_data.get('cmdb_data')if url and monitor_status and timestamp_str:timestamp = self.convert_to_local_time(timestamp_str)# 处理连续 up 的情况if monitor_status == 'up' and self.url_exists_down_in_mysql(mysql_cursor, url):url_groups[url].append(monitor_status)mysql_cursor.fetchall()if len(url_groups[url]) >= self.consecutive_up_threshold and all(status == 'up' for status in url_groups[url][-self.consecutive_up_threshold:]):self.delete_from_mysql(mysql_cursor, url, mysql_connection)logger.info(f"URL: {url} - 由于连续 up 被从 MySQL 中移除")else:# 处理连续 down 的情况if monitor_status == 'down' and not self.url_exists_down_in_mysql(mysql_cursor, url):url_groups[url].append(monitor_status)mysql_cursor.fetchall()if len(url_groups[url]) >= self.consecutive_down_threshold and all(status == 'down' for status in url_groups[url][-self.consecutive_down_threshold:]):self.send_alert(url)self.write_to_mysql(mysql_cursor, timestamp, url, monitor_status, mysql_connection, cmdb_data)url_groups[url] = []logger.info(f"URL: {url} - 被添加到 MySQL 中")elif monitor_status == 'up' and self.url_exists_down_in_mysql(mysql_cursor, url):url_groups[url].append(monitor_status)mysql_cursor.fetchall()if len(url_groups[url]) >= self.consecutive_up_threshold and all(status == 'up' for status in url_groups[url][-self.consecutive_up_threshold:]):self.delete_from_mysql(mysql_cursor, url, mysql_connection)url_groups[url] = []logger.info(f"URL: {url} - 由于连续 up 被从 MySQL 中移除")except Exception as e:# 处理运行时错误logger.error(f"发生错误: {e}")finally:# 关闭 Kafka Consumer 和 MySQL 连接if self.kafka_consumer:self.kafka_consumer.close()if mysql_connection:mysql_connection.close()def send_alert(self, url):# 记录告警信息logger.info(f"告警: URL {url} 连续 {self.consecutive_down_threshold} 次掉线")@staticmethoddef write_to_mysql(cursor, timestamp, url, status, connection, cmdb_data=None):try:# 插入数据到 MySQL,包括 "cmdb_data" 字段insert_query = """INSERT INTO alert_list (timestamp, url, status, app_name, module, ops_user, ops_tel, ops_dep)VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"""cursor.execute(insert_query, (timestamp,url,status,cmdb_data.get('app_name', '') if cmdb_data else '',cmdb_data.get('module', '') if cmdb_data else '',cmdb_data.get('ops_user', '') if cmdb_data else '',cmdb_data.get('ops_tel', '') if cmdb_data else '',cmdb_data.get('ops_dep', '') if cmdb_data else '') if cmdb_data else (timestamp, url, status, '', '', '', '', ''))connection.commit()logging.info(f"Inserted into MySQL: URL {url}, Status {status}, cmdb_data {cmdb_data}")except Exception as e:# 处理写入 MySQL 错误logger.error(f"Error writing to MySQL: {e}")@staticmethoddef delete_from_mysql(cursor, url, connection):try:# 从 MySQL 删除数据delete_query = "DELETE FROM alert_list WHERE url = %s AND status = 'down'"cursor.execute(delete_query, (url,))connection.commit()logging.info(f"从 MySQL 中删除: URL {url}")except Exception as e:# 处理从 MySQL 删除错误logger.error(f"从 MySQL 中删除时发生错误: {e}")@staticmethoddef url_exists_down_in_mysql(cursor, url):try:# 检查 URL 是否存在于 MySQL 中query = "SELECT * FROM alert_list WHERE url = %s AND status = 'down'"cursor.execute(query, (url,))return bool(cursor.fetchone())except Exception as e:# 处理检查 URL 存在性错误logger.error(f"检查 URL 是否存在于 MySQL 中时发生错误: {e}")return False@staticmethoddef convert_to_local_time(timestamp_str):# 将 UTC 时间转换为本地时间timestamp_utc = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%fZ")timestamp_local = timestamp_utc + timedelta(hours=8)return timestamp_local.strftime("%Y-%m-%d %H:%M:%S")if __name__ == "__main__":try:# 运行主程序heartbeat_checker = HeartbeatChecker()heartbeat_checker.check_heartbeat_alerts()except KeyboardInterrupt:print("退出...")

appllication.yml如下:

# application.ymlkafka:bootstrap_servers:- '11.0.1.11:9092'- '11.0.1.12:9092'- '11.0.1.13:9092'group_id: 'python_alert'topic: 'ELK-system_heartbeat'mysql:host: '11.0.1.11'port: 13306user: 'root'password: '123456'database: 'zll_python_test'thresholds:consecutive_down: 1consecutive_up: 1

其中consecutive_down表示连续down几次触发告警,consecutive_up表示连续up几次告警恢复。

六、metricbeat告警

metricbeat可以配置的告警比较多,比如CPU、内存、文件系统等,Python代码如下:

import logging
from logging.handlers import RotatingFileHandler
from kafka import KafkaConsumer, KafkaProducer
import yaml
import jsonclass KafkaAlertProcessor:def __init__(self, config_path):# 配置记录日志到控制台和文件self.configure_logging()with open(config_path, 'r', encoding='utf-8') as config_file:config = yaml.safe_load(config_file)self.kafka_brokers = config['kafka']['brokers']self.input_topic = config['kafka']['input_topic']self.output_topic = config['kafka']['output_topic']self.group_id = config['kafka']['group_id']self.cpu_alert_threshold = config['alert_thresholds']['cpu']self.memory_alert_threshold = config['alert_thresholds']['memory']self.filesystem_alert_threshold = config['alert_thresholds']['filesystem']self.common_template = config['alert_templates']['common']self.cpu_alert_template = config['alert_templates']['cpu']self.memory_alert_template = config['alert_templates']['memory']self.filesystem_alert_template = config['alert_templates']['filesystem']self.consumer = Noneself.producer = None@staticmethoddef configure_logging():# 配置日志记录到控制台和文件logger = logging.getLogger('')logger.setLevel(logging.INFO)# 配置控制台输出console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)# 配置文件输出,按文件大小进行轮转,最多保存10个日志文件,每个文件最大1Mfile_handler = RotatingFileHandler('metricbeat_alarm.log', maxBytes=1000000, backupCount=10,delay=False)file_handler.setLevel(logging.INFO)# 设置日志格式formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')console_handler.setFormatter(formatter)file_handler.setFormatter(formatter)# 将处理程序添加到日志记录器logger.addHandler(console_handler)logger.addHandler(file_handler)def initialize_consumer(self):self.consumer = KafkaConsumer(self.input_topic,group_id=self.group_id,bootstrap_servers=','.join(self.kafka_brokers),auto_offset_reset='latest',enable_auto_commit=False,)def initialize_producer(self):self.producer = KafkaProducer(bootstrap_servers=','.join(self.kafka_brokers),value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'))def process_alert(self, data):cmdb_data = data.get("cmdb_data", {})common_message = self.common_template.format(cmdb_data_eip=cmdb_data.get("eip", ""),cmdb_data_app_name=cmdb_data.get("app_name", ""),cmdb_data_module=cmdb_data.get("module", ""),cmdb_data_ops_user=cmdb_data.get("ops_user", ""),cmdb_data_ops_tel=cmdb_data.get("ops_tel", ""),cmdb_data_ops_dep=cmdb_data.get("ops_dep", ""))# 检查 CPU 使用率if "system" in data and "cpu" in data["system"] and "total" in data["system"]["cpu"] and "pct" in \data["system"]["cpu"]["total"]:cpu_usage = data["system"]["cpu"]["total"]["pct"] / data["system"]["cpu"]["cores"]if cpu_usage > self.cpu_alert_threshold:cpu_alert_message = self.cpu_alert_template.format(cpu_usage=data["system"]["cpu"]["total"]["pct"] / data["system"]["cpu"]["cores"])alert_message = cpu_alert_message + common_messageself.send_alert("CPU 告警", alert_message)# 检查内存使用率if "system" in data and "memory" in data["system"] and "actual" in data["system"]["memory"] and "used" in \data["system"]["memory"]['actual'] and 'pct' in data["system"]["memory"]['actual']['used']:memory_usage = data["system"]["memory"]["actual"]["used"]["pct"]if memory_usage > self.memory_alert_threshold:memory_alert_message = self.memory_alert_template.format(memory_actual_used_pct=memory_usage)alert_message = memory_alert_message + common_messageself.send_alert("内存告警", alert_message)# 检查文件系统使用率if "system" in data and "filesystem" in data["system"] and "used" in data["system"]["filesystem"] and "pct" in \data["system"]["filesystem"]["used"]:filesystem_usage = data["system"]["filesystem"]["used"]["pct"]if filesystem_usage > self.filesystem_alert_threshold:fs_alert_message = self.filesystem_alert_template.format(filesystem_used_pct=filesystem_usage,filesystem_mount_point=data["system"]["filesystem"].get("mount_point", "Unknown"))alert_message = fs_alert_message + common_messageself.send_alert("文件系统告警", alert_message)def send_alert(self, alert_type, alert_message):formatted_message = f"{alert_type} - {alert_message}"logging.warning(formatted_message)self.producer.send(self.output_topic,value={"alert_type": alert_type, "alert_message": formatted_message})def run(self):self.initialize_consumer()self.initialize_producer()try:for msg in self.consumer:try:data = json.loads(msg.value.decode('utf-8'))self.process_alert(data)except json.JSONDecodeError as e:logging.error(f"解码 JSON 错误: {e}")except KeyboardInterrupt:passfinally:if self.consumer:self.consumer.close()if __name__ == "__main__":kafka_alert_processor = KafkaAlertProcessor(config_path="application.yml")kafka_alert_processor.run()

application.yml文件如下:

kafka:brokers:- "11.0.1.11:9092"- "11.0.1.12:9092"- "11.0.1.13:9092"input_topic: "ELK-system_metricbeat"output_topic: "ELK-alarm"group_id: "ELK-alarm"alert_thresholds:cpu: 0.01memory: 0.1filesystem: 0.1alert_templates:common: "IP:{cmdb_data_eip}\n系统名称:{cmdb_data_app_name}\n模块:{cmdb_data_module}\n运维责任人:{cmdb_data_ops_user}\n电话:{cmdb_data_ops_tel}\n责任部门:{cmdb_data_ops_dep}\n"cpu: "【CPU使用率告警】\n告警信息:CPU 使用率 超过阈值。当前平均值:{cpu_usage}.\n"memory: "【内存使用率告警】\n告警信息:内存 使用率 超过阈值。当前值:{memory_actual_used_pct}.\n"filesystem: "【文件系统使用率告警】\n告警信息:文件系统 使用率 超过阈值。当前值:{filesystem_used_pct}. 挂载点:{filesystem_mount_point}.\n"

产生的告警信息如下:

【内存使用率告警】
告警信息:内存 使用率 超过阈值。当前值:0.1502.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部【文件系统使用率告警】
告警信息:文件系统 使用率 超过阈值。当前值:0.1178. 挂载点:/.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部【CPU使用率告警】
告警信息:CPU 使用率 超过阈值。当前平均值:0.002.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部

我目前是将告警信息输出到kafka、控制台和日志的,各位看官可以根据自己的需要,将信息写入到接口、redis或者数据库中。

七、告警恢复
正在研究中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/642027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电商铺货上货接口API实现无货源上货(1688/淘宝/京东/拼多多)

获取商品列表接口 item_search 获取商品详情页数据接口 item_get 1688.item_get 公共参数 获取API测试权限和地址 名称类型必须描述keyString是调用key(必须以GET方式拼接在URL中)secretString是调用密钥api_nameString是API接口名称(包…

Unity Text超框 文字滚动循环显示

Unity Text超框 文字滚动循环显示 //container Text using System.Collections; using System.Collections.Generic; using Unity.VisualScripting; using UnityEngine; using UnityEngine.UI;public class AutoScrollText : MonoBehaviour {private Text[] _texts new Text[…

MATLAB字符串编辑常用代码

1.字符串赋值 % 字符串赋值 sabcdefg 2.字符串属性和操作 (1)获取字符串长度 sabcdefg;% 字符串赋值 length(a) % 获取字符串长度 (2)连接字符串 % 连接两个字符串,每个字符串最右边的空格被裁切 s1a s2b s3strcat(s1,s2) 3.字符串比较 % strcmp 比较两个字符串是…

251.【2023年华为OD机试真题(C卷)】5G网络建设(最小生成树算法-JavaPythonC++JS实现)

🚀点击这里可直接跳转到本专栏,可查阅顶置最新的华为OD机试宝典~ 本专栏所有题目均包含优质解题思路,高质量解题代码(Java&Python&C++&JS分别实现),详细代码讲解,助你深入学习,深度掌握! 文章目录 一. 题目-5G网络建设二.解题思路三.题解代码Python题解代…

9. UE5 RPG创建UI(下)

在上一篇文章里,制作了显示血量和蓝量的ui,并且还将ui和获取数据使用的控制器层创建出来并初始化成功。现在只有主用户控件上面被添加了控制器层,还未给每个用户控件赋予控制器层。接下来要实现对属性的广播功能,在属性值变化的时…

【Effective C++】4. 设计与声明

Item22 将成员变量声明为private 有以下几个优势: 1. 语法一致性:访问class成员时每一个都是函数,不需要记住是否需要小括号 2. 使成员变量的处理有更精准的控制 class AccessLevels { public:int getReadOnly() const { return readOnly…

【issue-halcon例程学习】fuzzy_measure_pin.hdev

例程功能 检查IC的引线宽度和引线距离。 代码如下 dev_close_window () read_image (Image, board/board-06) get_image_size (Image, Width, Height) dev_open_window (0, 0, Width, Height, black, WindowHandle) * * --- Fuzzy Measure: Row1 : 305.5 Col1 : 375.5 Phi1 …

电商平台接口自动化测试脚本编写规范与实践

随着电商业务的快速发展,电商平台接口的稳定性和可靠性变得越来越重要。为了提高测试效率和保证接口质量,越来越多的企业开始采用自动化测试技术。本文将介绍电商平台接口自动化测试脚本的编写规范与实践,并给出相应的代码示例。 一、编写规…

gitlab.rb主要配置

根据是否docker安装,进入挂载目录或安装目录 修改此文件,我一般是在可视化窗口中修改,有时候也在命令行手敲 将下面的配置复制到该文件中 external_url http://192.168.100.50 # nginx[listen_port] = 8000 (docker安装的这一行不需要,因为端口映射导致此处修改会导致访问…

六使用Maven插件构建Docker镜像

使用Maven插件构建Docker镜像 我们知道,Maven是一个强大的项目管理与构建工具。如果可以使用Maven构建Docker镜像,那么我们的工作就能得到进一步的简化。 经过调研,以下几款Maven的Docker插件进入笔者视野,如表13-1所示。 插件…

C语言学习(3)—— 函数

C语言不支持函数重载 一、函数的使用方法 1. 直接使用 &#xff08;1&#xff09;定义的函数在main函数 之上 #include<stdio.h> int sum(int num1, int num2){int res 0;res num1 num2;return res; }int main(){int res sum(2, 3);printf("%d", res…

大数据学习之Flink、搞懂Flink的恢复策略

第一章、Flink的容错机制 第二章、Flink核心组件和工作原理 第三章、Flink的恢复策略 第四章、Flink容错机制的注意事项 第五章、Flink的容错机制与其他框架的容错机制相比较 目录 第三章、Flink的恢复策略 Ⅰ、恢复策略 1. Checkpoint&#xff1a; 2. Savepoint&#…

Sentinel降级操作

1.通过对feign调用的降级 如果访问失败&#xff0c;则返回另外的信息 正常的feign调用 FeignClient(value "gulimall-seckill",fallback SeckillFeignServiceFallback.class) public interface SeckillFeignService {/*** 写给商品服务的接口&#xff0c;查询秒杀…

2024茶饮品牌如何出圈,媒介盒子分析

随着新式茶饮的消费场景更加多元化&#xff0c;品类不断拓宽&#xff0c;消费者对新式茶饮的热情也是只增不减。居民可支配收入水平不断上升&#xff0c;居民消费升级为新式茶饮的发展也提供了良好基础&#xff0c;今天媒介盒子就来和大家聊聊&#xff1a;2024茶饮品牌如何出圈…

win10系统 pdf 文件无法正常预览

网上也看了不少办法&#xff0c;修改注册表什么的&#xff0c;太麻烦了&#xff0c;尝试了一下下载Adobe Acrobat Reader&#xff0c;安装后就可以成功预览显示啦&#xff01;对&#xff0c;就是这么简单&#xff01;Adobe Acrobat Reader下载链接&#xff1a;link

随机森林中每个树模型分裂时的特征选取方式

随机森林中每个树模型分裂时的特征选取方式 随机森林中每个树模型的每次分裂都是基于随机选取的特征子集进行分裂的。 具体来说&#xff0c;对于每个决策树&#xff0c;在每个节点的分裂过程中&#xff0c;随机森林算法会从原始特征集合中随机选择一个特征子集&#xff0c;然…

vulhub之Zabbix篇

CVE-2016-10134--SQL注入 一、漏洞介绍 zabbix是一款服务器监控软件&#xff0c;其由server、agent、web等模块组成&#xff0c;其中web模块由PHP编写&#xff0c;用来显示数据库中的结果。 漏洞环境 在vulhub靶场进行复现&#xff0c;启动zabbix 3.0.3。 二、复现步骤 1…

[代码随想录2]51单片机1T/12T到底怎么选?

为什么说51单片机怎么选&#xff1f; 时至今日&#xff0c;44年来51单片机自强不息&#xff0c;怎么描述它&#xff0c;堪称控制芯片中的王者&#xff01;&#xff01;&#xff01; 假设你21岁大学毕业进入社会&#xff0c;交社保交到今天恭喜你成功退休了214465 传统即标准5…

搭建一个简单的Spring Demo

要学习Spring 源码&#xff0c;一个是从Spring GitHub 上去down源码&#xff0c;然后倒入IDEA编译&#xff0c;但这种方法费时费力&#xff0c;如果你不需要对Spring 源码进行修改后&#xff0c;再编译的话&#xff0c;直接搭建一个Spring Demo 的Maven项目&#xff0c;引入Spr…

代理设计模式JDK动态代理CGLIB动态代理原理

代理设计模式 代理模式&#xff08;Proxy&#xff09;&#xff0c;为其它对象提供一种代理以控制对这个对象的访问。如下图 从上面的类图可以看出&#xff0c;通过代理模式&#xff0c;客户端访问接口时的实例实际上是Proxy对象&#xff0c;Proxy对象持有RealSubject的引用&am…