干货脚本如下
import os
import json
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from aliyunsdkcore.auth.credentials import AccessKeyCredential
from openpyxl import Workbook
from openpyxl.styles import Alignment
from datetime import datetime, timedelta
from pytz import timezone
import smtplib
import email.mime.multipart
import email.mime.text
import email.mime.imageclass QuerySlowLogsExporter:def __init__(self, access_key_id, access_key_secret, region_id='cn-xxxx', mysql_ids=None):self.access_key_id = access_key_idself.access_key_secret = access_key_secretself.region_id = region_idself.mysql_ids = mysql_ids if mysql_ids else []self.client = self.create_client()def create_client(self):# 创建AcsClient实例credentials = AccessKeyCredential(self.access_key_id, self.access_key_secret)return AcsClient(region_id=self.region_id, credential=credentials)def send_email(self, to_emails, subject, body, attachment_paths):# 设置发件人信息from_address = "xxx@xxx.cn"cc_addresses = ["xxx@xxx.cn"]# 设置邮件服务器信息mail_host = "smtp.feishu.cn"mail_user = "xxx@xxx.cn"mail_pass = "xxxx"ssl_port = "465"# 创建邮件主体main_msg = email.mime.multipart.MIMEMultipart('related')content = email.mime.text.MIMEText(body)main_msg.attach(content)# 添加附件for attachment_path in attachment_paths:with open(attachment_path, 'rb') as f:att = email.mime.text.MIMEText(f.read(), 'base64', 'utf-8')att["Content-Type"] = 'application/octet-stream'att["Content-Disposition"] = 'attachment; filename=' + os.path.basename(attachment_path)main_msg.attach(att)# 设置邮件头部信息main_msg['From'] = from_addressmain_msg['To'] = ','.join(to_emails)main_msg['Cc'] = ','.join(cc_addresses)main_msg['Subject'] = subjectfull_text = main_msg.as_string()try:# 连接邮件服务器并发送邮件smtp_obj = smtplib.SMTP_SSL(mail_host, ssl_port)smtp_obj.ehlo()smtp_obj.login(mail_user, mail_pass)smtp_obj.sendmail(from_address, to_emails + cc_addresses, full_text)except Exception as e:print("邮件发送失败:", e)finally:smtp_obj.quit()def get_time_range(self):# 获取最近7天的时间current_time = datetime.now(timezone('UTC'))seven_days_ago = current_time - timedelta(days=7)time_format = "%Y-%m-%dT%H:%MZ"last_week_time = seven_days_ago.strftime(time_format)today_time = current_time.strftime(time_format)# 生成日志文件名log_file_name = "saas3_time_log-" + current_time.strftime("%Y-%m-%d") + ".xlsx"# 检查目录是否存在,不存在则创建xls_dir = './xlsl'if not os.path.exists(xls_dir):os.makedirs(xls_dir)return last_week_time, today_time, os.path.join(xls_dir, log_file_name)def export_query_slow_logs(self, start_time, end_time, sql_hash, db_name="", page_size=30):all_logs = []for mysql_id in self.mysql_ids:# 查询慢日志request = CommonRequest()request.set_accept_format('json')request.set_domain('rds.aliyuncs.com')request.set_method('POST')request.set_protocol_type('https') # https | httprequest.set_version('2014-08-15')request.set_action_name('DescribeSlowLogRecords')request.add_query_param('StartTime', start_time)request.add_query_param('EndTime', end_time)request.add_query_param('DBInstanceId', mysql_id)request.add_query_param('SQLHASH', sql_hash)request.add_query_param('DBName', db_name)request.add_query_param('PageSize', str(page_size))response = self.client.do_action(request)response_data = str(response, encoding='utf-8')parsed_response = json.loads(response_data)all_logs.extend(parsed_response["Items"]["SQLSlowRecord"])return all_logsdef write_to_excel(self, logs, file_path):# 根据 QueryTimeMS 排序sorted_logs = sorted(logs, key=lambda x: x["QueryTimeMS"], reverse=True)# 写入Excelworkbook = Workbook()worksheet = workbook.activeworksheet.title = "最近一周慢日志明细"headers = ["QueryTimes", "ExecutionStartTime", "ReturnRowCounts", "DBName", "ParseRowCounts", "HostAddress", "QueryTimeMS", "SQLText"]for col_idx, header in enumerate(headers):worksheet.cell(row=1, column=col_idx + 1, value=header)worksheet.cell(row=1, column=col_idx + 1).alignment = Alignment(horizontal="center", vertical="center")row_idx = 2for log_entry in sorted_logs:worksheet.cell(row=row_idx, column=1, value=log_entry["QueryTimes"])worksheet.cell(row=row_idx, column=2, value=log_entry["ExecutionStartTime"])worksheet.cell(row=row_idx, column=3, value=log_entry["ReturnRowCounts"])worksheet.cell(row=row_idx, column=4, value=log_entry["DBName"])worksheet.cell(row=row_idx, column=5, value=log_entry["ParseRowCounts"])worksheet.cell(row=row_idx, column=6, value=log_entry["HostAddress"])worksheet.cell(row=row_idx, column=7, value=log_entry["QueryTimeMS"])worksheet.cell(row=row_idx, column=8, value=log_entry["SQLText"])row_idx += 1workbook.save(file_path)if __name__ == "__main__":access_key_id = "xxxxx"access_key_secret = "xxxxxxxx"mysql_ids = ["pgm-xxxxxx", "pgr-xxxxxx"]exporter = QuerySlowLogsExporter(access_key_id, access_key_secret, mysql_ids=mysql_ids)last_week_time, today_time, log_file_name = exporter.get_time_range()all_logs = exporter.export_query_slow_logs(last_week_time, today_time, "U2FsdGVk****")exporter.write_to_excel(all_logs, log_file_name)to_mails = ["xxxx@xxxx.com", "xxxx@xxx.com"]subject = "xxx慢日志通知"body = "xxxx慢日志通知"attachment_paths = [log_file_name]exporter.send_email(to_mails, subject, body, attachment_paths)
在这个脚本中
- 通过sdk获取阿里云的rds慢日志
- 把慢日志通过xlsl表格存储到本地
- 通过邮箱发送给相关的人员