文章目录
- 0. 引言
- 1. 功能
- 2.使用步骤
- 3. 程序架构
- 流程图
- 结构图
- 4. 数据解析模块
- 5. 图表绘制模块
- 6. 主程序入口
- 7. 总结
- 8. 附录完整代码
0. 引言
在性能调优和系统监控中,top
命令是一种重要工具,提供了实时的系统状态信息,如 CPU 使用率、内存使用情况和进程状态。然而,仅凭命令行输出可能无法满足复杂的分析需求。
本文将介绍如何解析 top
命令的输出,并利用 Python 生成动态图表,以更直观地展示系统性能数据。
1. 功能
-
解析
top
输出:- 解析
top -1 -H -b -n 1800 -d 1
获取的进程信息,文本数据通过pandas
库处理,并使用matplotlib
生成动态图表。 - 提取每个进程的 PID、用户、CPU 使用率、内存使用率、运行时间和命令信息。
- 解析
-
生成动态图表:
- 绘制系统总体内存使用情况的动态曲线(总内存、空闲内存、使用内存、缓存/缓冲区内存)。
- 绘制指定进程或线程的 CPU 和内存使用情况的动态曲线。
2.使用步骤
- dump top文本信息
-
需要系统统计的设备中,使用如下命令获取top信息
top -1 -H -b -n 1800 -d 1 > topdump.txt
。
参数-n 1800
指定了top
命令执行的次数,即执行1800次。而参数-d 1
则指定了每次执行之间的时间间隔为1秒,意味着top
命令每隔1秒就会输出一次进程信息。 -
注意:dump top信息时,请避免做其他操作以降低干扰。
-
-
使用以下命令解析
top
输出文件并生成动态图表:python top_parser.py --file topdump.txt --process <process_name> --show_threads --save_fig
- 参数说明:
--file
:指定top
输出文件的路径。--process
:指定要绘制的进程名称。--show_threads
:可选参数,显示进程内所有线程的详细信息。--save_fig
:可选参数,保存生成的图表为 PNG 图像文件。
- 参数说明:
- 如下图是使用
python topparser.py --file topdump.txt --process terminator --save_fig
生成的:
3. 程序架构
本程序主要分为以下几个模块:
-
数据解析模块:负责解析
top
命令的输出文本,并将提取的数据存储到pandas
的数据帧中。 -
图表绘制模块:基于解析得到的数据帧,使用
matplotlib
生成动态图表。 -
主程序入口:处理命令行参数,调用数据解析模块和图表绘制模块完成数据处理和图表生成的流程。
流程图
流程图将展示数据解析模块、图表绘制模块和主程序入口之间的交互过程。以下是流程图的示意:
结构图
结构图将展示程序的整体架构,包括数据解析模块、图表绘制模块和主程序入口的功能组成及其关系。以下是结构图的示意:
4. 数据解析模块
数据解析模块的主要任务是读取 top
命令的输出文件,识别其格式并提取出需要的性能指标和进程信息。以下是核心代码片段的部分实现:
# 数据解析模块核心代码示例
import pandas as pd
import redef parse_top_output(file_path):columns = ['timestamp', 'total_mem', 'free_mem', 'used_mem', 'buff_cache_mem', 'pid', 'user', 'cpu', 'mem', 'time', 'command']data = {col: [] for col in columns}with open(file_path, 'r') as file:lines = file.readlines()timestamp = Noneformat_type = Nonefor line in lines:if line.startswith('top -'):timestamp = re.search(r'top - (\d+:\d+:\d+)', line).group(1)elif 'KiB Mem :' in line or 'GiB Mem :' in line:format_type = 'format1' if 'KiB Mem :' in line else 'format2'mem_info = re.findall(r'[\d\.]+', line)if format_type == 'format1':data['total_mem'].append(int(mem_info[0]))data['free_mem'].append(int(mem_info[1]))data['used_mem'].append(int(mem_info[2]))data['buff_cache_mem'].append(int(mem_info[3]))else:total_mem_gb = float(mem_info[0])data['total_mem'].append(total_mem_gb * 1024 * 1024)data['free_mem'].append(None)data['used_mem'].append(None)data['buff_cache_mem'].append(None)data['timestamp'].append(timestamp)data['pid'].append(None)data['user'].append(None)data['cpu'].append(None)data['mem'].append(None)data['time'].append(None)data['command'].append(None)elif re.match(r'\s*\d+', line) or re.match(r'\s*\d+\s+\w+', line):if format_type == 'format1':proc_info = re.split(r'\s+', line.strip(), maxsplit=11)data['pid'].append(int(proc_info[0]))data['user'].append(proc_info[1])data['cpu'].append(float(proc_info[8]))data['mem'].append(float(proc_info[9]))data['time'].append(proc_info[10])data['command'].append(proc_info[11] if len(proc_info) > 11 else "")elif format_type == 'format2':proc_info = re.split(r'\s+', line.strip(), maxsplit=10)data['pid'].append(int(proc_info[0]))data['user'].append(proc_info[1])try:cpu_value = float(proc_info[5].strip('%')) if '%' in proc_info[5] else float(proc_info[5])mem_value = float(proc_info[6].strip('%')) if '%' in proc_info[6] else float(proc_info[6])except ValueError:cpu_value = 0.0mem_value = 0.0data['cpu'].append(cpu_value)data['mem'].append(mem_value)data['time'].append(proc_info[7])data['command'].append(proc_info[9] if len(proc_info) > 9 else "")data['timestamp'].append(timestamp)data['total_mem'].append(None)data['free_mem'].append(None)data['used_mem'].append(None)data['buff_cache_mem'].append(None)else:data['timestamp'].append(timestamp)for key in data:if key not in ['timestamp']:data[key].append(None)df = pd.DataFrame(data)df['timestamp'] = pd.to_datetime(df['timestamp'], format='%H:%M:%S')df['relative_time'] = (df['timestamp'] - df['timestamp'].min()).dt.total_seconds()return df
5. 图表绘制模块
图表绘制模块利用 matplotlib
库生成动态图表,以下是绘制内存使用动态曲线和进程线程动态曲线的核心代码片段:
# 图表绘制模块核心代码示例
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator, AutoLocatordef plot_memory_usage(ax, df, process_name=None):if 'relative_time' not in df.columns:print("relative_time column is missing in the dataframe")returnmemory_cols = ['total_mem', 'free_mem', 'used_mem', 'buff_cache_mem']df_memory = df.dropna(subset=memory_cols).drop_duplicates(subset=['relative_time'])max_memory = df_memory[memory_cols].max().max() # 获取内存使用的最大值for col in memory_cols:ax.plot(df_memory['relative_time'], df_memory[col], label=col.replace('_', ' ').title())ax.set_xlabel('Time (seconds)')ax.set_ylabel('Memory (KiB)')ax.set_ylim(0, max_memory * 1.1 if max_memory > 0 else 1)ax.set_title('Memory Usage Over Time')if process_name:ax.text(0.5, 0.5, process_name, transform=ax.transAxes, fontsize=20, ha='center', va='center', alpha=0.7, color='black')ax.legend()ax.grid(True)ax.xaxis.set_major_locator(AutoLocator())ax.yaxis.set_major_locator(MaxNLocator(integer=True))return axdef plot_process_threads(ax, df, processes, show_threads, metric='cpu'):for process in processes:df_process = df[df['command'].str.contains(process, na=False)]if show_threads:unique_pids = df_process['pid'].unique()for pid in unique_pids:df_pid = df_process[df_process['pid'] == pid]ax.plot(df_pid['relative_time'], df_pid[metric], label=f'{process} {metric.upper()} (PID {pid})')else:df_process_grouped = df_process.groupby('relative_time').agg({metric: 'sum'}).reset_index()ax.plot(df_process_grouped['relative_time'], df_process_grouped[metric], label=f'{process} (Total {metric.upper()})')ax.set_xlabel('Time (seconds)')ax.set_ylabel(f'{metric.upper()} Usage')ax.set_title(f'{metric.upper()} Usage of Processes and Threads Over Time')ax.legend()ax.grid(True)ax.xaxis.set_major_locator(AutoLocator())ax.yaxis.set_major_locator(MaxNLocator(integer=True))return ax
6. 主程序入口
主程序入口负责处理命令行参数,并调用数据解析和图表绘制模块完成数据处理和图表生成的流程。以下是主程序入口的核心代码片段:
# 主程序入口核心代码示例
import argparsedef main(file_path, processes, show_threads, save_fig=False):df = parse_top_output(file_path)plot_all(df, processes, show_threads, save_fig)if __name__ == "__main__":parser = argparse.ArgumentParser(description='Parse and plot top command output.')parser.add_argument('--file', type=str, required=True, help='Path to the top output file')parser.add_argument('--process', type=str, nargs='+', required=True, help='List of processes to plot')parser.add_argument('--show_threads', action='store_true', help='Show CPU and memory for all threads within the process')parser.add_argument('--save_fig', action='store_true', help='Save the generated plots as PNG images')args = parser.parse_args()main(args.file, args.process, args.show_threads, args.save_fig)
7. 总结
通过本文介绍的方法,可以有效解析 top
命令输出并生成动态图表,帮助用户更直观地分析系统性能数据。该方法不仅支持不同格式的 top
输出,还能够灵活配置,满足各种监控需求。
8. 附录完整代码
import pandas as pd
import matplotlib.pyplot as plt
import re
import argparse
from matplotlib.ticker import MaxNLocator, AutoLocator# 解析top命令输出
def parse_top_output(file_path):columns = ['timestamp', 'total_mem', 'free_mem', 'used_mem', 'buff_cache_mem', 'pid', 'user', 'cpu', 'mem', 'time', 'command']data = {col: [] for col in columns}with open(file_path, 'r') as file:lines = file.readlines()timestamp = Noneformat_type = Nonefor line in lines:if line.startswith('top -'):timestamp = re.search(r'top - (\d+:\d+:\d+)', line).group(1)elif 'KiB Mem :' in line or 'GiB Mem :' in line:format_type = 'format1' if 'KiB Mem :' in line else 'format2'mem_info = re.findall(r'[\d\.]+', line)if format_type == 'format1':data['total_mem'].append(int(mem_info[0]))data['free_mem'].append(int(mem_info[1]))data['used_mem'].append(int(mem_info[2]))data['buff_cache_mem'].append(int(mem_info[3]))else:total_mem_gb = float(mem_info[0])data['total_mem'].append(total_mem_gb * 1024 * 1024)data['free_mem'].append(None)data['used_mem'].append(None)data['buff_cache_mem'].append(None)data['timestamp'].append(timestamp)data['pid'].append(None)data['user'].append(None)data['cpu'].append(None)data['mem'].append(None)data['time'].append(None)data['command'].append(None)elif re.match(r'\s*\d+', line) or re.match(r'\s*\d+\s+\w+', line):if format_type == 'format1':proc_info = re.split(r'\s+', line.strip(), maxsplit=11)data['pid'].append(int(proc_info[0]))data['user'].append(proc_info[1])data['cpu'].append(float(proc_info[8]))data['mem'].append(float(proc_info[9]))data['time'].append(proc_info[10])data['command'].append(proc_info[11] if len(proc_info) > 11 else "")elif format_type == 'format2':proc_info = re.split(r'\s+', line.strip(), maxsplit=10)data['pid'].append(int(proc_info[0]))data['user'].append(proc_info[1])try:cpu_value = float(proc_info[5].strip('%')) if '%' in proc_info[5] else float(proc_info[5])mem_value = float(proc_info[6].strip('%')) if '%' in proc_info[6] else float(proc_info[6])except ValueError:cpu_value = 0.0mem_value = 0.0data['cpu'].append(cpu_value)data['mem'].append(mem_value)data['time'].append(proc_info[7])data['command'].append(proc_info[9] if len(proc_info) > 9 else "")data['timestamp'].append(timestamp)data['total_mem'].append(None)data['free_mem'].append(None)data['used_mem'].append(None)data['buff_cache_mem'].append(None)else:data['timestamp'].append(timestamp)for key in data:if key not in ['timestamp']:data[key].append(None)df = pd.DataFrame(data)df['timestamp'] = pd.to_datetime(df['timestamp'], format='%H:%M:%S')df['relative_time'] = (df['timestamp'] - df['timestamp'].min()).dt.total_seconds()return df# 将时间戳转换为秒数
def convert_timestamp_to_seconds(timestamp):h, m, s = map(int, timestamp.split(':'))return h * 3600 + m * 60 + s# 绘制内存动态曲线
def plot_memory_usage(ax, df, process_name=None):if 'relative_time' not in df.columns:print("relative_time column is missing in the dataframe")returnmemory_cols = ['total_mem', 'free_mem', 'used_mem', 'buff_cache_mem']df_memory = df.dropna(subset=memory_cols).drop_duplicates(subset=['relative_time'])max_memory = df_memory[memory_cols].max().max() # 获取内存使用的最大值for col in memory_cols:ax.plot(df_memory['relative_time'], df_memory[col], label=col.replace('_', ' ').title())ax.set_xlabel('Time (seconds)')ax.set_ylabel('Memory (KiB)')ax.set_ylim(0, max_memory * 1.1 if max_memory > 0 else 1)ax.set_title('Memory Usage Over Time')if process_name:ax.text(0.5, 0.5, process_name, transform=ax.transAxes, fontsize=20, ha='center', va='center', alpha=0.7, color='black')ax.legend()ax.grid(True)ax.xaxis.set_major_locator(AutoLocator())ax.yaxis.set_major_locator(MaxNLocator(integer=True))return ax# 绘制进程和线程动态曲线
def plot_process_threads(ax, df, processes, show_threads, metric='cpu'):for process in processes:df_process = df[df['command'].str.contains(process, na=False)]if show_threads:unique_pids = df_process['pid'].unique()for pid in unique_pids:df_pid = df_process[df_process['pid'] == pid]ax.plot(df_pid['relative_time'], df_pid[metric], label=f'{process} {metric.upper()} (PID {pid})')else:df_process_grouped = df_process.groupby('relative_time').agg({metric: 'sum'}).reset_index()ax.plot(df_process_grouped['relative_time'], df_process_grouped[metric], label=f'{process} (Total {metric.upper()})')ax.set_xlabel('Time (seconds)')ax.set_ylabel(f'{metric.upper()} Usage')ax.set_title(f'{metric.upper()} Usage of Processes and Threads Over Time')ax.legend()ax.grid(True)ax.xaxis.set_major_locator(AutoLocator())ax.yaxis.set_major_locator(MaxNLocator(integer=True))return ax# 绘制图表
def plot_all(df, processes, show_threads, save_fig=False):for process in processes:fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(12, 12))df_process = df[df['command'].str.contains(process, na=False)]if show_threads:plot_process_threads(axes[0], df_process, [process], show_threads, metric='mem')plot_process_threads(axes[1], df_process, [process], show_threads, metric='cpu')else:plot_memory_usage(axes[0], df, process_name=process)plot_process_threads(axes[1], df, [process], show_threads)plt.tight_layout(pad=3.0)if save_fig:fig.savefig(f'{process}_analysis.png') # 保存图像plt.show()# 主函数
def main(file_path, processes, show_threads, save_fig=False):df = parse_top_output(file_path)plot_all(df, processes, show_threads, save_fig)# 处理命令行参数
if __name__ == "__main__":parser = argparse.ArgumentParser(description='Parse and plot top command output.')parser.add_argument('--file', type=str, required=True, help='Path to the top output file')parser.add_argument('--process', type=str, nargs='+', required=True, help='List of processes to plot')parser.add_argument('--show_threads', action='store_true', help='Show CPU and memory for all threads within the process')parser.add_argument('--save_fig', action='store_true', help='Save the generated plots as PNG images')args = parser.parse_args()main(args.file, args.process, args.show_threads, args.save_fig)