筛选日志文件内最近一个小时内的日志并将匹配项及后三行编辑成内容发送给钉钉
import time
import os
import requests
import re
from datetime import datetime, timedelta# 监控的日志文件根路径
log_root_path = '/data/logs/'# 钉钉机器人 Webhook URL
dingding_webhook = 'https://oapi.dingtalk.com/robot/send?access_token=2622064ca3b67285ceaec7ee215722d6a31028aa72775e1adee605bfd5d2d719'# 正则表达式匹配 ERROR 字样
error_pattern = re.compile(r'ERROR')# 发送消息到钉钉
def send_dingding_message(message):headers = {'Content-Type': 'application/json'}data = {"msgtype": "text","text": {"content": message},"at": {"isAtAll": False}}response = requests.post(dingding_webhook, json=data, headers=headers)print("DingTalk Response:", response.text)# 检查最近一个小时内有 ERROR 的日志内容
def check_logs():current_time = datetime.now()one_hour_ago = current_time - timedelta(hours=1) # 当前时间一小时前的时间for root, dirs, files in os.walk(log_root_path):for file in files:if file.endswith("info.log"):log_file_path = os.path.join(root, file)try:with open(log_file_path, 'r') as file:lines = file.readlines()for line in lines:if error_pattern.search(line):file_modified_time = datetime.fromtimestamp(os.path.getmtime(log_file_path))if file_modified_time >= one_hour_ago and file_modified_time <= current_time:error_lines = ''.join(lines)send_dingding_message(f"Error found in log file: {log_file_path}\n\n{error_lines}")breakexcept FileNotFoundError:pass # 文件可能在检查间隙被删除或移动if __name__ == "__main__":try:while True:check_logs()time.sleep(3600) # 每隔一个小时检查一次except KeyboardInterrupt:pass
筛选日志文件内最近一个小时内的日志并将匹配项的行且是整个文件的最后一条编辑成内容发送给钉钉
import time
import os
import requests
import re
from datetime import datetime, timedelta# 监控的日志文件根路径
log_root_path = '/data/exchange-web-logs/'# 钉钉机器人 Webhook URL
dingding_webhook = 'https://oapi.dingtalk.com/robot/send?access_token=aewgregtrghgyh6r5578895646y5ujuujk'# 正则表达式匹配 ERROR 字样
error_pattern = re.compile(r'ERROR')# 发送消息到钉钉
def send_dingding_message(message):headers = {'Content-Type': 'application/json'}data = {"msgtype": "text","text": {"content": message},"at": {"isAtAll": False}}response = requests.post(dingding_webhook, json=data, headers=headers)print("DingTalk Response:", response.text)# 检查最近一个小时内有 ERROR 的日志内容
def check_logs():current_time = datetime.now()one_hour_ago = current_time - timedelta(hours=1) # 当前时间一小时前的时间for root, dirs, files in os.walk(log_root_path):for file in files:if file.endswith("info.log"):log_file_path = os.path.join(root, file)last_error_line = None # 记录最后一个错误行try:with open(log_file_path, 'r', encoding='utf-8', errors='ignore') as file:lines = file.readlines()for line in lines:if error_pattern.search(line):file_modified_time = datetime.fromtimestamp(os.path.getmtime(log_file_path))if file_modified_time >= one_hour_ago and file_modified_time <= current_time:last_error_line = line.strip() # 记录最后一个匹配到的错误行except FileNotFoundError:pass # 文件可能在检查间隙被删除或移动# 发送最后一个错误行if last_error_line:send_dingding_message(f"Error found in log file: {log_file_path}\n\n{last_error_line}")if __name__ == "__main__":try:while True:check_logs()time.sleep(3600) # 每隔一个小时检查一次except KeyboardInterrupt:pass