网络安全-华为华三交换机防火墙日志解析示例

DEF_SYSLOG_SWITCH_HUAWEI.py

华为交换机日志解析示例

# -*- coding: utf8 -*-
import time
from DEF_COLOR import *   ## 终端显示颜色def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):#时间文本格式 = '%Y-%m-%d %H:%M:%S'时间类 = time.localtime(时间戳)时间文本 = time.strftime(时间文本格式, 时间类)return(时间文本)## 适用于需要在SYSLOG的时间上加上时区才能和本地时间相等的情况
def 日志时间转时间戳(时间文本, 加时区):时间戳 = time.mktime(time.strptime(时间文本, '%Y-%m-%dT%H:%M:%S'))+(3600*加时区)return(时间戳)## 2022-11-28T19:00:51+08:00 主机名 %%01SHELL/5/LOGOUT(s)[3918]: The user succeeded in logging out of VTY0. (UserType=SSH, UserName=xxx, Ip=xxx.xxx.xxx.xxx, VpnName=)
def LOG_TYPE(LINE_TEXT):A = LINE_TEXT.find('%%')if A != -1:#打印_黄(LINE_TEXT[A:])B = LINE_TEXT.find('(', A)if B != -1:#打印_绿(LINE_TEXT[B:])C = LINE_TEXT.find(' ', B)  ## 找日志正文开始位置标志if C != -1:#打印_蓝(LINE_TEXT[C:])#打印_青(LINE_TEXT[A+4:B])SP = LINE_TEXT[A+4:B].split('/')return((SP[0], SP[2], C+1))else:打印_红(f"找位置标志' '失败(日志正文开始位) {LINE_TEXT}")return((' -1', LINE_TEXT[A:B], -1))else:打印_红(f"找位置标志'('失败 {LINE_TEXT}")return((LINE_TEXT[:A], '(-1', -1))else:A2 = LINE_TEXT.find(': OID')if A2 != -1:SP = LINE_TEXT[:A2].split()[-1].split('/')return((SP[0], SP[2], A2+1))else:打印_红(f"找位置标志'%%'失败且查找': OID'也失败 {LINE_TEXT}")return(('%-1', LINE_TEXT, -1))# %%01ACLE/4/ACLLOG(l)[xxx]: Acl 3997 deny GigabitEthernet0/0/10 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx udp x.x.x.x(63877) -> 239.255.255.250(1900) (1 packet).
# %%01ACLE/4/ACLLOG(l)[xxx]: Acl 3997 deny GigabitEthernet0/0/10 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx igmp x.x.x.x -> 224.0.0.22 (4 packets).
# %%01ACLE/4/ACLLOG(l)[xxx]: Acl      deny GigabitEthernet0/0/14 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx tcp x.x.x.x(60559) -> x.x.x.x(80) (1 packet).
def ACLE_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LOG_MSG):SP = LOG_MSG.split()if len(SP) == 11:SP = ['X'] + SP     # 偶尔会丢个 acl id 补齐列表长度 'GigabitEthernet0/0/14 Acl'ACL_ID = SP[1]ACL_ST = SP[2]ACL_IF = SP[3]SMAC = SP[4]DMAC = SP[6]PT  = SP[7]SIP = SP[8].split('(')[0]   # 不记录源端口号DIP = SP[10].split('(')[0]  # 不记录目的端口号COUNT = int(SP[11][1:])K = (ACL_IF, ACL_ST, PT, f"{SIP}({SMAC})", f"{DIP}({DMAC})", ACL_ID)if K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE']:D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K] += COUNTelse:D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K] = COUNT# SACL/4/ACLLOG(l)[9487]: Acl 3996 applied Interface GigabitEthernet0/0/6 permit (15591 packets).
# SACL/4/ACLLOG(l)[6146]: Acl 3992 applied Interface  permit (2 packets).
# SACL/4/ACLLOG(l)[6112]: Acl 3992 applied Interface GigabitEthernet0/0/14 permit (174757578 packets).
def SACL_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LOG_MSG):try:SP = LOG_MSG.split()if len(SP) == 7:ACL_ID = SP[1]ACL_IF = SP[4]ACL_ST = SP[5]COUNT = int(SP[6][1:])elif len(SP) == 6:ACL_ID = SP[1]ACL_IF = 'un'ACL_ST = SP[4]COUNT = int(SP[5][1:])else:打印_红(f"ERROR LOG_MSG={LOG_MSG} SP={SP} len(SP)={len(SP)}")K = (ACL_IF, ACL_ST, ACL_ID)if K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL']:D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K] += COUNTelse:D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K] = COUNTexcept Exception as e:打印_红(f"ERROR {e} LOG_MSG={LOG_MSG} SP={SP} len(SP)={len(SP)}")else:pass## 解析SYSLOG日志一行内容
def LINE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT, TIME_LOCAL):TP1, TP2, X = LOG_TYPE(LINE_TEXT)if TP1 == 'ACLE':if TP2 == 'ACLLOG':#打印_黄(LINE_TEXT[X:-1])ACLE_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT[X:-10])else:return((1, TP1, TP2))   ## 终止,不做解析,返回标识代码及日志类型信息elif TP1 == 'SACL':if TP2 == 'ACLLOG':SACL_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT[X:-10])else:return((1, TP1, TP2))elif TP1 == 'SHELL':if TP2 == 'LOGIN':                                 # The user succeeded in logging in to VTY0. (UserType=SSH, UserName=xxx, AuthenticationMethod="Local-user", Ip=xxx.xxx.xxx.xxx, VpnName=)SP = LINE_TEXT[X:-1].split(',')UserType = SP[0].split('=')[-1]UserName = SP[1].split('=')[-1]IP = SP[3].split('=')[-1]D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'].append((TIME_LOCAL, 'LOGIN', UserType, UserName, IP))elif TP2 == 'LOGOUT':                              # The user succeeded in logging out of VTY0. (UserType=SSH, UserName=xxx, Ip=xxx.xxx.xxx.xxx, VpnName=)#print(TP1,TP2,LINE_TEXT[X:-1])SP = LINE_TEXT[X:-1].split(',')UserType = SP[0].split('=')[-1]UserName = SP[1].split('=')[-1]IP = SP[2].split('=')[-1]D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'].append((TIME_LOCAL, 'LOGOUT', UserType, UserName, IP))elif TP2 in ('DISPLAY_CMDRECORD', 'CMDRECORD'):    # %%01SHELL/6/DISPLAY_CMDRECORD(s)[1869]: Recorded display command information. (Task=VT0, Ip=x.x.x.x, VpnName=, User=xx, AuthenticationMethod="Local-user", Command="display stp brief")SP = LINE_TEXT[X:].split(',')IP = SP[1].split('=')[-1]USER = SP[3].split('=')[-1]CMD = SP[5].split('=')[-1][:-2]#打印_青(f"(SHELL, {TP2}) {TIME_LOCAL} {IP} {USER} {CMD}")D_SYSLOG_SWITCH_HUAWEI['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))elif TP2 == 'CMDCONFIRM_UNIFORMRECORD':            # %%01SHELL/6/CMDCONFIRM_UNIFORMRECORD(s)[1867]: Record command information. (Task=VT0, IP=x.x.x.x, VpnName=, User=xx, Command="", PromptInfo="The password needs to be changed. Change now? [Y/N]:", UserInput=N)SP = LINE_TEXT[X:].split(',')IP = SP[1].split('=')[-1]USER = SP[3].split('=')[-1]PromptInfo = SP[5].split('=')[-1]UserInput = SP[6].split('=')[-1][:-2]CMD = f"{PromptInfo} {UserInput}"#打印_青(f"(SHELL, {TP2}) {TIME_LOCAL} {IP} {USER} {CMD}")D_SYSLOG_SWITCH_HUAWEI['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))else:return((1, TP1, TP2))elif TP1 == 'IFPDT':if TP2 == 'PKT_OUTDISCARD_ABNL':    ## 端口【出】方向丢包达到报警阈值SP = LINE_TEXT[X:].split(',')#for i in SP:#    print(i)SW_IF = SP[0].split('=')[-1]SW_DROP = SP[1].split('=')[-1]#print(SW_IF, SW_DROP, 'OUT')D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'OUT', '丢包超过阈值'))elif TP2 == 'PKT_OUTDISCARD_NL':    ## 端口【出】方向恢复正常SP = LINE_TEXT[X:].split(',')SW_IF = SP[0].split('=')[-1]SW_DROP = SP[1].split('=')[-1]#print(SW_IF, SW_DROP, 'OUT')D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'OUT', '恢复'))elif TP2 == 'PKT_INDISCARD_ABNL':   ## 端口【入】方向丢包达到报警阈值SP = LINE_TEXT[X:].split(',')#for i in SP:#    print(i)SW_IF = SP[0].split('=')[-1]SW_DROP = SP[1].split('=')[-1]#print(SW_IF, SW_DROP, 'IN')D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'IN', '丢包超过阈值'))elif TP2 == 'PKT_INDISCARD_NL':     ## 端口【入】方向恢复正常SP = LINE_TEXT[X:].split(',')SW_IF = SP[0].split('=')[-1]SW_DROP = SP[1].split('=')[-1]#print(SW_IF, SW_DROP, 'IN')D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'IN', '恢复'))elif TP2 == 'IF_STATE':SP = LINE_TEXT[X:].split()IF_ID = SP[1]IF_ST = SP[5]if IF_ID not in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY']:D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][IF_ID] = []D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][IF_ID].append((TIME_LOCAL, IF_ST))else:return((1, TP1, TP2))elif TP1 == 'SECE':D_SYSLOG_SWITCH_HUAWEI['L_SECE'].append(LINE_TEXT)elif TP1 == 'MSTP':D_SYSLOG_SWITCH_HUAWEI['L_STP'].append((TIME_LOCAL, LINE_TEXT[X:-1]))elif TP1 == 'IFNET':print('IFNET', TP2, TIME_LOCAL, LINE_TEXT[X:-1])elif TP1 == 'IFADP':print('IFADP', TP2, TIME_LOCAL, LINE_TEXT[X:-1])elif TP1 == 'SSH':if TP2 == 'SSH_TRANS_FILE_FINISH':D_SYSLOG_SWITCH_HUAWEI['L_FTP'].append((TIME_LOCAL, LINE_TEXT[X:-1]))else:return((1, TP1, TP2))elif TP1 == 'VTY':print('VTY', LINE_TEXT)else:return((1, TP1, TP2))return((0, TP1, TP2))## 解析SYSLOG日志文件
def FILE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):#print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)TIME_S = time.time()TOT_N = 0SELECT_N = 0解析数量 = 0for LINE_BYTES in open(FILE_PATH, mode='br'):TOT_N += 1try:LINE_TEXT = LINE_BYTES.decode('UTF-8')except Exception as e:打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")else:TIME_UTC = LINE_TEXT[:19]#TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))+28800TIME_STAMP = 日志时间转时间戳(TIME_UTC, 8)TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:SELECT_N += 1#print(TIME_UTC, TIME_LOCAL, 'RUN')ST,TP1,TP2 = LINE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT, TIME_LOCAL)if ST != 0:#打印_红(f"{TOT_N} 未知LOG")#breakif (TP1,TP2) in D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER']:D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][(TP1,TP2)] += 1else:D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][(TP1,TP2)] = 1else:解析数量 += 1else:#print(TIME_UTC, TIME_LOCAL, 'PASS')passTIME_RUN = time.time() - TIME_S#打印_绿(f"{FILE_PATH} 完成 处理日志数量 {SELECT_N}/{TOT_N} 筛选数/总日志数 用时={TIME_RUN:.2f}秒")return((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN))def SYSLOG_SWITCH_HUAWEI(FILE_PATH, SHOW=0):D_SYSLOG_SWITCH_HUAWEI = {}D_SYSLOG_SWITCH_HUAWEI['D_ACL'] = {'ACLE':{}, 'SACL':{}} # ACL规则匹配记录D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'] = {}  # 接口物理断开记录D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'] = {} # 接口链路断开记录D_SYSLOG_SWITCH_HUAWEI['L_CMD'] = []     # 用户执行命令记录D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'] = []   # 登录信息D_SYSLOG_SWITCH_HUAWEI['L_STP'] = []     # 生成树信息D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'] = [] # 接口丢包D_SYSLOG_SWITCH_HUAWEI['L_SECE'] = []    # 安全事件D_SYSLOG_SWITCH_HUAWEI['L_FTP'] = []D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'] = {} # 未解析日志记录TIME_STAMP_MIN = 0TIME_STAMP_MAX = time.time()    # 默认为当前时间戳R = FILE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)if SHOW == 1:打印_黄("端口丢包 (D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'])")for TIME, SW_IF, SW_DROP, IN_OUT, PS in D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP']:if PS == '恢复':打印_绿(f"    {TIME} {SW_IF} {SW_DROP} {IN_OUT:3s} {PS}")else:打印_红(f"    {TIME} {SW_IF} {SW_DROP} {IN_OUT:3s} {PS}")打印_黄("设备安全 (D_SYSLOG_SWITCH_HUAWEI['L_SECE'])")for i in D_SYSLOG_SWITCH_HUAWEI['L_SECE']:打印_红(f"    {i}")打印_黄("备份配置 (D_SYSLOG_SWITCH_HUAWEI['L_FTP'])")for i in D_SYSLOG_SWITCH_HUAWEI['L_FTP']:打印_青(f"    {i}")打印_黄("生成树   (D_SYSLOG_SWITCH_HUAWEI['L_STP'])")for i in D_SYSLOG_SWITCH_HUAWEI['L_STP']:打印_红(f"    {i}")打印_黄("访问规则 (D_SYSLOG_SWITCH_HUAWEI['D_ACL'])")for K1 in D_SYSLOG_SWITCH_HUAWEI['D_ACL']:if K1 == 'ACLE':L_K2 = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1]]L_K2.sort()for K2 in L_K2:ACL_IF, ACL_ST, PT, SIPSMAC, DIPDMAC, ACL_ID = K2if ACL_ST == 'deny':打印_红(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ID:4s} {ACL_ST:5s} {PT:4s} {SIPSMAC:31s} -> {DIPDMAC}")else:打印_绿(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ID:4s} {ACL_ST:5s} {PT:4s} {SIPSMAC:31s} -> {DIPDMAC}")elif K1 == 'SACL':L_K2 = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1]]L_K2.sort()for K2 in L_K2:ACL_IF, ACL_ST, ACL_ID = K2if ACL_ST == 'deny':打印_红(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ST:5s}")else:打印_绿(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ST:5s}")打印_黄("接口物理状态 (D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'])")for K in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY']:print(f"    {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][K])}")for TIME_LOCAL, IF_ST in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][K]:if IF_ST == 'DOWN':打印_红(f"        {TIME_LOCAL} DOWN")else:打印_绿(f"        {TIME_LOCAL} {IF_ST}")#打印_黄("接口逻辑状态 (D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'])")#for K in D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK']:#    print(f"    {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'][K])}")打印_黄("登录登出详细 (D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'])")for TIME_LOCAL, ST, UserType, UserName, IP in D_SYSLOG_SWITCH_HUAWEI['L_LOGIN']:if ST == 'LOGIN':打印_青(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")elif ST == 'LOGOUT':打印_蓝(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")else:打印_红(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")打印_黄("操作命令 (D_SYSLOG_SWITCH_HUAWEI['L_CMD'])")for TIME_LOCAL, IP, USER, CMD in D_SYSLOG_SWITCH_HUAWEI['L_CMD']:打印_青(f"    {TIME_LOCAL} {IP:15s} {USER:8s} {CMD}")打印_黄("忽略解析日志 (D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'])")L_K = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER']]L_K.sort()for K in L_K:print(f"{D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][K]:5} {K}")FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN = R时间文本格式 = '%Y-%m-%d %H:%M:%S'打印_红(f"日志路径 : {FILE_PATH}")打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")打印_绿(f"解析/筛选/总数: {解析数量}/{SELECT_N}/{TOT_N} {(解析数量/TOT_N)*100:.2f}(%)/{(SELECT_N/TOT_N)*100:.2f}(%)/100(%)  用时: {TIME_RUN:.2f}秒")## 返回需要的信息(ACL放行或阻止统计)D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}#print(D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'])for K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE']:ACL_IF = K[0]ACL_ST = K[1]ACL_ID = K[5]KEY_NEW = f"{ACL_IF} {ACL_ID}"if ACL_ST.upper() == 'PERMIT':if KEY_NEW not in D_RETURN['PERMIT']:D_RETURN['PERMIT'][KEY_NEW] = 0D_RETURN['PERMIT'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]elif ACL_ST.upper() == 'DENY':if KEY_NEW not in D_RETURN['DENY']:D_RETURN['DENY'][KEY_NEW] = 0D_RETURN['DENY'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]else:if KEY_NEW not in D_RETURN['OTHER']:D_RETURN['OTHER'][KEY_NEW] = 0D_RETURN['OTHER'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]#print(D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'])for K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL']:ACL_IF, ACL_ST, ACL_ID = KKEY_NEW = f"{ACL_IF} {ACL_ID}"if ACL_ST.upper() == 'PERMIT':D_RETURN['PERMIT'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]elif ACL_ST.upper() == 'DENY':D_RETURN['DENY'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]else:D_RETURN['OTHER'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]return(D_RETURN)if __name__ == '__main__':FILE_PATH = '华为交换机日志文件路径'SHOW = 1    # 查看重要日志信息D_ACL_INFO = SYSLOG_SWITCH_HUAWEI(FILE_PATH, SHOW)print(D_ACL_INFO)

DEF_SYSLOG_SWITCH_H3C.py

华三交换机日志示例

# -*- coding: utf8 -*-
import time
from DEF_COLOR import *   ## 终端显示颜色def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):#时间文本格式 = '%Y-%m-%d %H:%M:%S'时间类 = time.localtime(时间戳)时间文本 = time.strftime(时间文本格式, 时间类)return(时间文本)def 日志时间转时间戳(时间文本):时间戳 = time.mktime(time.strptime(时间文本, '%Y-%m-%dT%H:%M:%S'))return(时间戳)def 参数时间转时间戳(TEXT):SP_DATE = TEXT.split('T')if len(SP_DATE) == 2:DATE = SP_DATE[0]SP_TIME = SP_DATE[1].split(':')if len(SP_TIME) == 1:M_TIME = '%H'elif len(SP_TIME) == 2:M_TIME = '%H:%M'elif len(SP_TIME) == 3:M_TIME = '%H:%M:%S'DATE_TIME = TEXTelse:DATE = time.strftime('%Y-%m-%d')SP_TIME = TEXT.split(':')if len(SP_TIME) == 1:M_TIME = '%H'elif len(SP_TIME) == 2:M_TIME = '%H:%M'elif len(SP_TIME) == 3:M_TIME = '%H:%M:%S'DATE_TIME = f"{DATE}T{TEXT}"#print(f"TEXT={TEXT}")#print(f"DATE={DATE}")#print(f"M_TIME={M_TIME}")#print(f"DATE_TIME={DATE_TIME}")M_DATE_TIME = f"%Y-%m-%dT{M_TIME}"TIME_STAMP = time.mktime(time.strptime(DATE_TIME, M_DATE_TIME))#print(f"TIME_STAMP={TIME_STAMP}")return(TIME_STAMP)def 转时间戳(TEXT):SP_DATE = TEXT.split('T')if len(SP_DATE) == 2:DATE = SP_DATE[0]SP_TIME = SP_DATE[1].split(':')if len(SP_TIME) == 1:M_TIME = '%H'elif len(SP_TIME) == 2:M_TIME = '%H:%M'elif len(SP_TIME) == 3:M_TIME = '%H:%M:%S'DATE_TIME = TEXTelse:DATE = time.strftime('%Y-%m-%d')SP_TIME = TEXT.split(':')if len(SP_TIME) == 1:M_TIME = '%H'elif len(SP_TIME) == 2:M_TIME = '%H:%M'elif len(SP_TIME) == 3:M_TIME = '%H:%M:%S'DATE_TIME = f"{DATE}T{TEXT}"#print(f"TEXT={TEXT}")#print(f"DATE={DATE}")#print(f"M_TIME={M_TIME}")print(f"DATE_TIME={DATE_TIME}")M_DATE_TIME = f"%Y-%m-%dT{M_TIME}"TIME_STAMP = time.mktime(time.strptime(DATE_TIME, M_DATE_TIME))#print(f"TIME_STAMP={TIME_STAMP}")return(TIME_STAMP)def LOG_TYPE(LINE_TEXT):A = LINE_TEXT.find('%%')         # 主机名 %%10ACL/6/PFILTER_STATIS_INFO: GigabitEthernet1/0/25 (outbound): Packet-filter if A != -1:#打印_黄(LINE_TEXT[A:])B = LINE_TEXT.find(':', A)   # 找日志类型结尾标识符号if B != -1:#打印_绿(LINE_TEXT[B:])SP = LINE_TEXT[A+4:B].split('/')    # CFGMAN/4/TRAP(t)if SP[2][-1] == ')':IDX = SP[2].find('(')if IDX != -1:return((SP[0], SP[2][:IDX], B+1))else:return((SP[0], SP[2], B+1))else:return((SP[0], SP[2], B+1))else:打印_红(f"找位置标志':'失败(日志类型结尾标识) {LINE_TEXT}")return((A, ':-1', -1))else:打印_红(f"找位置标志'%%' {LINE_TEXT}")return(('%%-1', LINE_TEXT, -1))##GigabitEthernet1/0/3 (outbound): Packet-filter 2205 rule 97 deny source 192.168.0.0 0.0.255.255 logging 14 packet(s).
##GigabitEthernet1/0/3 (inbound):  Packet-filter name in_log rule 40 permit ip destination 192.168.0.0 0.0.255.255 logging 20 packet(s).
def ACL_PFILTER_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LOG_MSG):try:IDX_1 = LOG_MSG.index(': Packet-filter ')IDX_2 = LOG_MSG.index('logging')IDX_3 = LOG_MSG.index(' packet')except:打印_红(f"ERR {LOG_MSG}")else:SW_IF = LOG_MSG[:IDX_1]if LOG_MSG[IDX_1+16:IDX_1+20] == 'name':RULE = LOG_MSG[IDX_1+21:IDX_2-1]else:RULE = LOG_MSG[IDX_1+16:IDX_2-1]NCOUNT = int(LOG_MSG[IDX_2+8:IDX_3])#打印_绿(f"|{SW_IF}|{RULE}|{NCOUNT}|")if (SW_IF, RULE) in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']:D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][(SW_IF, RULE)] += NCOUNTelse:D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][(SW_IF, RULE)] = NCOUNT##GigabitEthernet1/0/5 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 129 packet(s).
##GigabitEthernet1/0/5 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 752 packet(s).
##GigabitEthernet1/0/2 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 12 packet(s).
def ACL_PFILTER_IPV6_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LOG_MSG):##打印_黄(LOG_MSG)try:IDX_1 = LOG_MSG.index(': Packet-filter ')IDX_2 = LOG_MSG.index('logging')IDX_3 = LOG_MSG.index(' packet')except:打印_红(f"ERR {LOG_MSG}")else:SW_IF = LOG_MSG[:IDX_1]#print(f"SW_IF={SW_IF}")if LOG_MSG[IDX_1+21:IDX_1+25] == 'name':#print(f"LOG_MSG[IDX_1+26:IDX_2-1]={LOG_MSG[IDX_1+26:IDX_2-1]}")RULE = LOG_MSG[IDX_1+26:IDX_2-1]else:RULE = LOG_MSG[IDX_1+21:IDX_2-1]NCOUNT = int(LOG_MSG[IDX_2+8:IDX_3])#打印_绿(f"|{SW_IF}|{RULE}|{NCOUNT}|")if (SW_IF, RULE) in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6']:D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][(SW_IF, RULE)] += NCOUNTelse:D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][(SW_IF, RULE)] = NCOUNT## ARP 超过阈值
##The ARP packet rate(6 pps) exceeded the rate limit(5 pps) on interface GigabitEthernet1/0/19 in the last 60 seconds.
##The ARP packet rate(118 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 seconds.
##The ARP packet rate(100 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 seconds.
def ARP_RATE_EXCEEDED(D_SYSLOG_SWITCH_H3C, LOG_MSG):SP = LOG_MSG.split()实际 = SP[3]限制 = SP[8]接口 = SP[12]实际值 = int(实际[5:])限制值 = int(限制[6:])#print(f"{实际}|{实际值}|{限制}|{限制值}|{接口}")if (接口,限制值) in D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED']:D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][(接口,限制值)]+= 实际值else:D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][(接口,限制值)] = 实际值## 广播超阈值次数(广播风暴)
##GigabitEthernet1/0/21 is in normal status, BC flux exceeds its upper threshold 5 pps.
def STORM_CONSTRAIN_EXCEED(D_SYSLOG_SWITCH_H3C, LOG_MSG):接口 = LOG_MSG.split()[0]if 接口 in D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED']:D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][接口]+= 1else:D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][接口] = 1## 解析SYSLOG日志一行内容
def LINE_H3C(D_SYSLOG_SWITCH_H3C, LINE_TEXT, TIME_LOCAL):TP1, TP2, X = LOG_TYPE(LINE_TEXT)if TP1 == 'ACL':if TP2 == 'PFILTER_STATIS_INFO':#打印_黄(LINE_TEXT[X:-1])ACL_PFILTER_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])elif TP2 == 'PFILTER_IPV6_STATIS_INFO':ACL_PFILTER_IPV6_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])else:return((1, TP1, TP2))           ## 终止,不做解析,返回标识代码及日志类型信息elif TP1 == 'IFNET':if TP2 == 'PHY_UPDOWN':             ## Physical state on the interface GigabitEthernet1/0/8 changed to down.#打印_黄(LINE_TEXT[X+1:-1])SP = LINE_TEXT[X+1:-1].split()IF_ID = SP[5]IF_ST = SP[-1]#print(f'PHY  {IF_ID:21s} {IF_ST:5s} {TIME_LOCAL}')if IF_ID not in D_SYSLOG_SWITCH_H3C['D_IF_PHY']:D_SYSLOG_SWITCH_H3C['D_IF_PHY'][IF_ID] = []D_SYSLOG_SWITCH_H3C['D_IF_PHY'][IF_ID].append((TIME_LOCAL, IF_ST))elif TP2 == 'LINK_UPDOWN':          ## Line protocol state on the interface GigabitEthernet1/0/8 changed to down.#打印_黄(LINE_TEXT[X+1:-1])SP = LINE_TEXT[X+1:-1].split()IF_ID = SP[6]IF_ST = SP[-1]#print(f'LINK {IF_ID:21s} {IF_ST:5s} {TIME_LOCAL}')if IF_ID not in D_SYSLOG_SWITCH_H3C['D_IF_LINK']:D_SYSLOG_SWITCH_H3C['D_IF_LINK'][IF_ID] = []D_SYSLOG_SWITCH_H3C['D_IF_LINK'][IF_ID].append((TIME_LOCAL, IF_ST))elif TP2 == 'STORM_CONSTRAIN_EXCEED':STORM_CONSTRAIN_EXCEED(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])elif TP2 == 'STORM_CONSTRAIN_BELOW':#print(LINE_TEXT)passelse:return((1, TP1, TP2))elif TP1 == 'LLDP':if TP2 == 'LLDP_CREATE_NEIGHBOR':   ## Nearest bridge agent neighbor created on port GigabitEthernet1/0/10 (IfIndex 10), neighbor's chassis ID is xxxx-xxxx-xxxx, port ID is xxxx-xxxx-xxxx.SP = LINE_TEXT[X+1:-1].split()IF_ID  = SP[7]IF_MAC = SP[14]D_SYSLOG_SWITCH_H3C['L_LLDP'].append((TIME_LOCAL, 'CREATE', IF_ID, IF_MAC))elif TP2 == 'LLDP_DELETE_NEIGHBOR': ## Nearest bridge agent neighbor deleted on port GigabitEthernet1/0/6 (IfIndex 6), neighbor's chassis ID is xxxx-xxxx-xxxx, port ID is xxxx-xxxx-xxxx.SP = LINE_TEXT[X+1:-1].split()IF_ID  = SP[7]IF_MAC = SP[14]D_SYSLOG_SWITCH_H3C['L_LLDP'].append((TIME_LOCAL, 'DELETE', IF_ID, IF_MAC))else:return((1, TP1, TP2))elif TP1 == 'SSHS':if TP2 == 'SSHS_LOG':               ## Accepted password for xxx from xxx.xxx.xxx.xxx port 55598.pass#打印_红(LINE_TEXT.rstrip('\n'))elif TP2 == 'SSHS_SFTP_OPER':#打印_黄(LINE_TEXT[X+1:-1])D_SYSLOG_SWITCH_H3C['L_FTP'].append((TIME_LOCAL, LINE_TEXT[X+1:-1]))else:return((1, TP1, TP2))elif TP1 == 'SHELL':if TP2 == 'SHELL_CMD':               ## -Line=vty0-IPAddr=xxx.xxx.xxx.xxx-User=xxx; Command is dis cuIndex_IP = LINE_TEXT.index('-IPAddr=')Index_USER = LINE_TEXT.index('-User=')IP = LINE_TEXT[Index_IP+8:Index_USER]LOG_CMD = LINE_TEXT[Index_USER+6:].rstrip('\n')USER = LOG_CMD.split(';')[0]CMD = LOG_CMD[len(USER)+13:]##打印_红(f"{USER} {CMD}")D_SYSLOG_SWITCH_H3C['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))elif TP2 == 'SHELL_LOGIN':           ## xxx logged in from xxx.xxx.xxx.xxx.SP = LINE_TEXT[X+1:-1].split()USER = SP[0]IP = SP[4]D_SYSLOG_SWITCH_H3C['L_LOGIN'].append(('LOGIN', TIME_LOCAL, USER, IP))elif TP2 == 'SHELL_LOGOUT':          ## xxx logged out from xxx.xxx.xxx.xxx.SP = LINE_TEXT[X+1:-1].split()USER = SP[0]IP = SP[4]D_SYSLOG_SWITCH_H3C['L_LOGIN'].append(('LOGOUT', TIME_LOCAL, USER, IP))else:return((1, TP1, TP2))elif TP1 == 'CFGMAN':if TP2 == 'CFGMAN_CFGCHANGED':       ## -EventIndex=74-CommandSource=snmp-ConfigSource=startup-ConfigDestination=running; Configuration changed.D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'].append(TIME_LOCAL)else:return((1, TP1, TP2))elif TP1 == 'ARP':#print(f"LINE_TEXT={LINE_TEXT}")if TP2 == 'ARP_RATE_EXCEEDED':      ## %%10ARP/4/ARP_RATE_EXCEEDED: The ARP packet rate(100 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 secondsARP_RATE_EXCEEDED(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])else:return((1, TP1, TP2))else:return((1, TP1, TP2))return((0, TP1, TP2))## 解析SYSLOG日志文件
def FILE_H3C(D_SYSLOG_SWITCH_H3C, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):#print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)TIME_S = time.time()TOT_N = 0SELECT_N = 0解析数量 = 0for LINE_BYTES in open(FILE_PATH, mode='br'):TOT_N += 1try:LINE_TEXT = LINE_BYTES.decode('UTF-8')except Exception as e:打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")else:TIME_UTC = LINE_TEXT[:19]TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:SELECT_N += 1#print(TIME_UTC, TIME_LOCAL, 'RUN')ST,TP1,TP2 = LINE_H3C(D_SYSLOG_SWITCH_H3C, LINE_TEXT, TIME_LOCAL)if ST != 0:#打印_红(f"{TOT_N} 未知LOG")#breakif (TP1,TP2) in D_SYSLOG_SWITCH_H3C['D_LOG_OTHER']:D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][(TP1,TP2)] += 1else:D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][(TP1,TP2)] = 1else:解析数量 += 1else:#print(TIME_UTC, TIME_LOCAL, 'PASS')passTIME_RUN = time.time() - TIME_Sreturn((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN))def SYSLOG_SWITCH_H3C(FILE_PATH, SHOW=0):D_SYSLOG_SWITCH_H3C = {}D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'] = {} # 重点ACL规则匹配记录D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'] = {} # 重点ACL规则匹配记录D_SYSLOG_SWITCH_H3C['D_IF_PHY']  = {} # 接口物理断开记录D_SYSLOG_SWITCH_H3C['D_IF_LINK'] = {} # 接口链路断开记录D_SYSLOG_SWITCH_H3C['L_LLDP']    = [] # 邻接设备记录D_SYSLOG_SWITCH_H3C['D_IF_MAC']  = {} # 接口电脑MAC记录D_SYSLOG_SWITCH_H3C['L_CMD']     = [] # 用户执行命令记录D_SYSLOG_SWITCH_H3C['L_LOGIN']   = [] # 登录信息D_SYSLOG_SWITCH_H3C['L_FTP']     = []D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'] = []  # 配置被修改时间记录D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'] = {} # 未解析日志记录D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'] = {} # ARP限制D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'] = {}   #广播风暴达到阈值次数TIME_STAMP_MIN = 0TIME_STAMP_MAX = time.time()    # 默认为当前时间戳R = FILE_H3C(D_SYSLOG_SWITCH_H3C, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)if SHOW == 1:打印_黄("D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'] 匹配ACL日志")L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']]L_K.sort()for K in L_K:PORT, RULE = KSP_RULE = RULE.split()if len(SP_RULE) > 2:if SP_RULE[3] == 'permit':打印_绿(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]:8}  {PORT:33s} {RULE}")elif SP_RULE[3] == 'deny':打印_红(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]:8}  {PORT:33s} {RULE}")else:print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")else:print(f"ERR2 {K} {FILE_PATH}")打印_黄("D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'] 匹配ACL日志")L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6']]L_K.sort()for K in L_K:PORT, RULE = KSP_RULE = RULE.split()if len(SP_RULE) > 2:if SP_RULE[3] == 'permit':打印_绿(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][K]:8}  {PORT:33s} {RULE}")elif SP_RULE[3] == 'deny':打印_红(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][K]:8}  {PORT:33s} {RULE}")else:print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")else:print(f"ERR2 {K} {FILE_PATH}")打印_黄("D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'] ARP 发包超过阈值")L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED']]L_K.sort()for K in L_K:合计 = D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][K]接口,限制值 = K打印_青(f"{合计:8}  {接口:22s}({限制值})")打印_黄("D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'] 广播风暴发包超过阈值")L = [(D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][i],i) for i in D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED']]L.sort()for 次数,接口 in L:打印_红(f"{次数:8}  {接口:22s}")打印_黄("D_SYSLOG_SWITCH_H3C['L_LLDP'] 邻居设备变化")for TIME_LOCAL, ST, IF_ID, IF_MAC in D_SYSLOG_SWITCH_H3C['L_LLDP']:if ST == 'CREATE':打印_绿(f"    {TIME_LOCAL} {IF_ID} {IF_MAC} {ST}")else:打印_红(f"    {TIME_LOCAL} {IF_ID} {IF_MAC} {ST}")打印_黄("D_SYSLOG_SWITCH_H3C['D_IF_PHY'] 端口物理线路变化")for K in D_SYSLOG_SWITCH_H3C['D_IF_PHY']:打印_红(f"    {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_H3C['D_IF_PHY'][K])}")#打印_黄('D_SYSLOG_SWITCH_H3C['D_IF_LINK']')#for K in D_SYSLOG_SWITCH_H3C['D_IF_LINK']:#    print(f"    {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_H3C['D_IF_LINK'][K])}")打印_黄("D_SYSLOG_SWITCH_H3C['L_CMD']")for TIME_LOCAL, IP, USER, CMD in D_SYSLOG_SWITCH_H3C['L_CMD']:打印_青(f"    {TIME_LOCAL} {IP:15s} {USER:8s} '{CMD}'")打印_黄("D_SYSLOG_SWITCH_H3C['L_LOGIN'] 登录登出详细")for ST, TIME_LOCAL, USER, IP in D_SYSLOG_SWITCH_H3C['L_LOGIN']:if ST == 'LOGIN':打印_青(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")elif ST == 'LOGOUT':打印_蓝(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")else:打印_红(f"    {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")打印_黄("D_LOGIN 登录汇总")D_LOGIN = {}for ST, TIME_LOCAL, USER, IP in D_SYSLOG_SWITCH_H3C['L_LOGIN']:if ST == 'LOGIN':if (IP, USER) not in D_LOGIN:D_LOGIN[(IP, USER)] = []D_LOGIN[(IP, USER)].append(TIME_LOCAL)for K in D_LOGIN:打印_红(f"    {K} {D_LOGIN[K]}")打印_黄("D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'] 配置被修改时间记录")for i in D_SYSLOG_SWITCH_H3C['L_CFGCHANGED']:打印_紫(f"    {i}")打印_黄("D_SYSLOG_SWITCH_H3C['L_FTP']")for TIME_LOCAL, LOG_TEXT in D_SYSLOG_SWITCH_H3C['L_FTP']:打印_青(f"    {TIME_LOCAL} {LOG_TEXT}")打印_黄("D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'] 未解析日志记录")L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_LOG_OTHER']]L_K.sort()for K in L_K:print(f"{D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][K]:5} {K}")FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN = R时间文本格式 = '%Y-%m-%d %H:%M:%S'打印_红(f"日志路径 : {FILE_PATH}")打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")打印_绿(f"解析/筛选/总数: {解析数量}/{SELECT_N}/{TOT_N} {(解析数量/TOT_N)*100:.2f}(%)/{(SELECT_N/TOT_N)*100:.2f}(%)/100(%)  用时: {TIME_RUN:.2f}秒")## 返回需要的信息D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}#print(D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'])for K in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']:PORT, RULE = KSP_RULE = RULE.split()KEY_NEW = f"{PORT} {SP_RULE[0]}"if len(SP_RULE) > 2:if SP_RULE[3] == 'permit':if KEY_NEW not in D_RETURN['PERMIT']:D_RETURN['PERMIT'][KEY_NEW] = 0D_RETURN['PERMIT'][KEY_NEW] += D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]elif SP_RULE[3] == 'deny':if KEY_NEW not in D_RETURN['DENY']:D_RETURN['DENY'][KEY_NEW] = 0D_RETURN['DENY'][KEY_NEW] += D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]else:print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")else:print(f"ERR2 {K} {FILE_PATH}")return(D_RETURN)if __name__ == '__main__':FILE_PATH = '华三交换机日志文件路径'SHOW = 1    # 查看重要日志信息D_ACL_INFO = SYSLOG_SWITCH_H3C(FILE_PATH, SHOW)print(D_ACL_INFO)

DEF_SYSLOG_USG.py

华为USG防火墙

# -*- coding: utf8 -*-
import time
from DEF_COLOR import *   ## 终端显示颜色def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):#时间文本格式 = '%Y-%m-%d %H:%M:%S'时间类 = time.localtime(时间戳)时间文本 = time.strftime(时间文本格式, 时间类)return(时间文本)def USG_LOG_TYPE(LINE_TEXT):A = LINE_TEXT.find('%')     ## 2022-12-11T16:03:31+08:00 IPS6515E %%01SECIF/6/STREAM(l)[136014]: In Last Five Minutesif A != -1:#打印_黄(LINE_TEXT[A:])B = LINE_TEXT.find('(', A)if B != -1:#打印_绿(LINE_TEXT[B:])C = LINE_TEXT.find(':', B)  # 找日志正文开始位置标志if C != -1:#打印_蓝(LINE_TEXT[C:])#打印_青(LINE_TEXT[A+4:B])SP = LINE_TEXT[A+4:B].split('/')return((SP[0], SP[2], C+1))else:打印_红(f"找位置标志':'失败(日志正文开始位) {LINE_TEXT}")return((':-1', LINE_TEXT[A:B], -1))else:打印_红(f"找位置标志'('失败 {LINE_TEXT}")return((LINE_TEXT[:A], '(-1', -1))else:#打印_红(f"找位置标志'%'失败 {LINE_TEXT}")A2 = LINE_TEXT.find('DS/4/DATASYNC_CFGCHANGE')if A2 != -1:return(('DS', 'DATASYNC_CFGCHANGE', A2+1))else:打印_红(f"找位置标志'%'失败 {LINE_TEXT}")return(('%-1', LINE_TEXT, -1))## 解析SYSLOG日志一行内容
# POLICY 规则匹配日志 规则内配置 policy logging 产生
# SECLOG/4/PACKET_DENY 丢包信息 规则内配置 session logging 产生
def USG_LOG_LINE(D_LOG_USG, LINE_TEXT, TIME_LOCAL):TP1, TP2, X = USG_LOG_TYPE(LINE_TEXT)if TP1 == 'POLICY':if TP2 in ('POLICYDENY', 'POLICYPERMIT'):#print(TP1, TP2, LINE_TEXT[X:-1])SP = LINE_TEXT[X:-1].split(',')PT = SP[1].split('=')[-1]SIP = SP[2].split('=')[-1]SPORT = SP[3].split('=')[-1]DIP   = SP[4].split('=')[-1]DPORT = SP[5].split('=')[-1]TIME  = SP[6].split('=')[-1]SZONE = SP[7].split('=')[-1]DZONE = SP[8].split('=')[-1]RULE_NAME = SP[9].split('=')[-1][:-1]if PT == '6':PT = 'TCP'elif PT == '17':PT = 'UDP'D_LOG_USG['L_SEC_RULE'].append((TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, TP2[6:]))else:return((TP1, TP2))elif TP1 == 'SECLOG':if TP2 == 'PACKET_DENY':    # IPVer=4,Protocol=tcp,SourceIP=89.248.164.165,DestinationIP=183.129.153.43,SourcePort=48397,DestinationPort=8888,DestinationNatIP=192.168.200.112,DestinationNatPort=8888,BeginTime=1671059311,EndTime=1671059311,SourceVpnID=0,DestinationVpnID=0,SourceZone=untrust,DestinationZone=trust,PolicyName=HW,CloseReason=policy-deny.#print(TP1, TP2, LINE_TEXT[X:-1])SP = LINE_TEXT[X:-1].split(',')IPVer = SP[0].split('=')[-1]Protocol = SP[1].split('=')[-1]SIP = SP[2].split('=')[-1]DIP   = SP[3].split('=')[-1]SPORT = SP[4].split('=')[-1]DPORT = SP[5].split('=')[-1]D_NAT_IP = SP[6].split('=')[-1]D_NAT_PORT = SP[7].split('=')[-1]BeginTime = int(SP[8].split('=')[-1])EndTime = int(SP[9].split('=')[-1])SZONE = SP[12].split('=')[-1]DZONE = SP[13].split('=')[-1]PolicyName = SP[14].split('=')[-1]CloseReason = SP[15].split('=')[-1][:-1]if CloseReason == 'policy-deny':丢包类型 = '安全策略丢包'elif CloseReason == 'default-policy-deny':丢包类型 = '缺省包过滤丢包'elif CloseReason == 'session miss':丢包类型 = '未命中会话丢包'elif CloseReason == 'others':丢包类型 = '其他类型丢包'else:丢包类型 = f'未知类型丢包:{CloseReason}'D_LOG_USG['L_SEC_PACKET'].append((TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, 丢包类型))elif TP2 == 'SESSION_TEARDOWN':SP = LINE_TEXT[X:-1].split(',')IPVer = SP[0].split('=')[-1]Protocol = SP[1].split('=')[-1]SIP = SP[2].split('=')[-1]DIP   = SP[3].split('=')[-1]SPORT = SP[4].split('=')[-1]DPORT = SP[5].split('=')[-1]D_NAT_IP = SP[6].split('=')[-1]D_NAT_PORT = SP[7].split('=')[-1]BeginTime = int(SP[8].split('=')[-1])EndTime = int(SP[9].split('=')[-1])SendPkts =  int(SP[10].split('=')[-1])SendBytes = int(SP[11].split('=')[-1])RcvPkts =   int(SP[12].split('=')[-1])RcvBytes =  int(SP[13].split('=')[-1])SZONE = SP[16].split('=')[-1]DZONE = SP[17].split('=')[-1]PolicyName = SP[18].split('=')[-1]CloseReason = SP[19].split('=')[-1][:-1]D_LOG_USG['L_SEC_SESSION'].append((TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, CloseReason))else:return((TP1, TP2))elif TP1 == 'SHELL':if TP2 in ('LOGIN', 'LOGOUT'):#SP = LINE_TEXT[X:-1].split(',')#print(SP)#用户类型 = SP[1].split('=')[-1]#用户名 = SP[2].split('=')[-1]#用户地址 = SP[4].split('=')[-1]#D_LOG_USG['L_LOGIN'].append((TIME_LOCAL, TP2, 用户类型, 用户名, 用户地址))D_LOG_USG['L_LOGIN'].append((TIME_LOCAL, TP1, TP2, LINE_TEXT[X:-1]))else:return((TP1, TP2))elif TP1 == 'PHY':if TP2 in ('STATUSDOWN', 'STATUSUP'):接口号 = LINE_TEXT[X:-1].split(':')[0]IF_ST = TP2[6:]if 接口号 in D_LOG_USG['D_IF_PHY']:D_LOG_USG['D_IF_PHY'][接口号].append((TIME_LOCAL, IF_ST))else:D_LOG_USG['D_IF_PHY'][接口号] = [(TIME_LOCAL, IF_ST)]else:return((TP1, TP2))else:if TP1 == '':打印_紫(LINE_TEXT[X:-1])return((TP1, TP2))return(0)## 解析SYSLOG日志文件
def USG_LOG_FILE(D_LOG_USG, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):#print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)TIME_S = time.time()TOT_N = 0SELECT_N = 0for LINE_BYTES in open(FILE_PATH, mode='br'):TOT_N += 1try:LINE_TEXT = LINE_BYTES.decode('GB2312')except Exception as e:打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")else:TIME_UTC = LINE_TEXT[:19]TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))#TIME_STAMP = 日志时间转时间戳(TIME_UTC, 8)TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:SELECT_N += 1#print(TIME_UTC, TIME_LOCAL, 'RUN')R = USG_LOG_LINE(D_LOG_USG, LINE_TEXT, TIME_LOCAL)if R != 0:#打印_红(f"{TOT_N} 未知LOG")#breakif R in D_LOG_USG['D_LOG_OTHER']:D_LOG_USG['D_LOG_OTHER'][R] += 1else:D_LOG_USG['D_LOG_OTHER'][R] = 1else:#print(TIME_UTC, TIME_LOCAL, 'PASS')passTIME_RUN = time.time() - TIME_S#打印_绿(f"{FILE_PATH} 完成 处理日志数量 {SELECT_N}/{TOT_N} 筛选数/总日志数 用时={TIME_RUN:.2f}秒")return((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, TIME_RUN))def 秒数转时长表示(总秒数):天数 = 总秒数//86400剩余秒数 = 总秒数%86400时数 = 剩余秒数//3600剩余秒数 = 剩余秒数%3600分数 = 剩余秒数//60剩余秒数 = 剩余秒数%60#print(f"{天数}:{时数}:{分数}:{剩余秒数}(天:时:分:秒)")return(f"{天数:02}:{时数:02}:{分数:02}:{剩余秒数:02}")def USG(PATH_SYSLOG_FILE, SHOW=0):D_LOG_USG = {}D_LOG_USG['L_SEC_RULE'] = []D_LOG_USG['L_SEC_PACKET'] = []D_LOG_USG['L_SEC_SESSION'] = []D_LOG_USG['D_IF_PHY']  = {} # 接口物理断开记录D_LOG_USG['D_IF_LINK'] = {} # 接口链路断开记录D_LOG_USG['L_CMD'] = []     # 用户执行命令记录D_LOG_USG['L_LOGIN'] = []   # 登录信息D_LOG_USG['D_LOG_OTHER'] = {} # 未解析日志记录TIME_STAMP_MIN = 0TIME_STAMP_MAX = time.time()    # 默认为当前时间戳R = USG_LOG_FILE(D_LOG_USG, PATH_SYSLOG_FILE, TIME_STAMP_MIN, TIME_STAMP_MAX)if SHOW == 1:打印_黄("L_SEC_RULE")D_SEC_RULE = {}for TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, DoP in D_LOG_USG['L_SEC_RULE']:if DoP == 'DENY':打印_红(f"    {TIME}  {RULE_NAME:8s} {DoP:6s} {PT:4s} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} {SZONE} -> {DZONE}")#else:#    打印_绿(f"    {TIME}  {RULE_NAME:8s} {DoP:6s} {PT:4s} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} {SZONE} -> {DZONE}")KEY = (RULE_NAME, DoP)if KEY in D_SEC_RULE:D_SEC_RULE[KEY] += 1else:D_SEC_RULE[KEY] = 1for KEY in D_SEC_RULE:if KEY[1] == 'DENY':打印_红(f"    {KEY[0]:8s} {KEY[1]:6s} {D_SEC_RULE[KEY]}")else:打印_绿(f"    {KEY[0]:8s} {KEY[1]:6s} {D_SEC_RULE[KEY]}")打印_黄("L_SEC_SESSION")for TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, CloseReason in D_LOG_USG['L_SEC_SESSION']:会话秒数 = EndTime-BeginTimeif 会话秒数 < 30:打印_灰(f"    {TIME}  {PolicyName:8s} {Protocol:4s} {SIP:15s} {SPORT:5s} -> 用时:{秒数转时长表示(会话秒数)}(天:时:分:秒) {会话秒数}秒")else:打印_紫(f"    {TIME}  {PolicyName:8s} {Protocol:4s} {SIP:15s} {SPORT:5s} -> 用时:{秒数转时长表示(会话秒数)}(天:时:分:秒) {会话秒数}秒")打印_黄("L_SEC_PACKET")D_SEC_PACKET = {}for TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, 丢包类型 in D_LOG_USG['L_SEC_PACKET']:#打印_红(f"    {TIME_LOCAL} [{时间戳_2_时间文本(BeginTime)} {时间戳_2_时间文本(EndTime)}] {丢包类型} {PolicyName:8s} {Protocol}.{IPVer} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} -> {D_NAT_IP}:{D_NAT_PORT} {SZONE} -> {DZONE}")KEY = (PolicyName, 丢包类型)if KEY in D_SEC_PACKET:D_SEC_PACKET[KEY] += 1else:D_SEC_PACKET[KEY] = 1for KEY in D_SEC_PACKET:打印_红(f"    {KEY[0]:8s} {KEY[1]} {D_SEC_PACKET[KEY]}")打印_黄("D_IF_PHY")for K in D_LOG_USG['D_IF_PHY']:print(f"    {K:21s} DOWN/UP 次数 {len(D_LOG_USG['D_IF_PHY'][K])}")for TIME_LOCAL, IF_ST in D_LOG_USG['D_IF_PHY'][K]:if IF_ST == 'DOWN':打印_红(f"        {TIME_LOCAL} DOWN")else:打印_绿(f"        {TIME_LOCAL} {IF_ST}")#print("D_IF_LINK")#for K in D_LOG_USG['D_IF_LINK']:#    print(f"    {K:21s} DOWN/UP 次数 {len(D_LOG_USG['D_IF_LINK'][K])}")打印_黄("L_LOGIN")for i in D_LOG_USG['L_LOGIN']:打印_红(f"    {' '.join(i)}")打印_黄("D_LOG_OTHER")L_K = [i for i in D_LOG_USG['D_LOG_OTHER']]L_K.sort()for K in L_K:print(f"{D_LOG_USG['D_LOG_OTHER'][K]:5} {K}")FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, TIME_RUN = R时间文本格式 = '%Y-%m-%d %H:%M:%S'打印_紫(f"日志路径 : {FILE_PATH}")打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")打印_绿(f"解析/总数: {SELECT_N}/{TOT_N} {(SELECT_N/TOT_N)*100:.2f}(%)  用时: {TIME_RUN:.2f}秒")## 返回需要的信息(防火墙规则放行和阻止的统计)D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}for TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, DoP in D_LOG_USG['L_SEC_RULE']:if DoP == 'PERMIT':if RULE_NAME not in D_RETURN['PERMIT']:D_RETURN['PERMIT'][RULE_NAME] = 0D_RETURN['PERMIT'][RULE_NAME] += 1elif DoP == 'DENY':if RULE_NAME not in D_RETURN['DENY']:D_RETURN['DENY'][RULE_NAME] = 0D_RETURN['DENY'][RULE_NAME] += 1else:if RULE_NAME not in D_RETURN['OTHER']:D_RETURN['OTHER'][RULE_NAME] = 0D_RETURN['OTHER'][RULE_NAME] += 1return(D_RETURN)if __name__ == '__main__':FILE_PATH = '华为防火墙日志文件路径'SHOW = 1    # 查看重要日志信息D_RULE_INFO = USG(PATH_SYSLOG_FILE, SHOW)print(D_RULE_INFO)

DEF_COLOR.py

给点颜色看看

# -*- coding: utf8 -*-
import os## 终端显示颜色
if os.name == 'nt':       # Windowsimport ctypes,sysSTD_OUTPUT_HANDLE = -11# Windows CMD命令行 字体颜色定义 text colors黑字 = 0x00 # black.暗蓝字 = 0x01 # dark blue.暗绿字 = 0x02 # dark green.暗青字 = 0x03 # dark skyblue.暗红字 = 0x04 # dark red.暗紫字 = 0x05 # dark pink.暗黄字 = 0x06 # dark yellow.暗白字 = 0x07 # dark white.灰字 = 0x08 # dark gray.蓝字 = 0x09 # blue.绿字 = 0x0a # green.青字 = 0x0b # skyblue.红字 = 0x0c # red.紫字 = 0x0d # pink.黄字 = 0x0e # yellow.白字 = 0x0f # white.# Windows CMD命令行 背景颜色定义 background colors暗蓝底 = 0x10 # dark blue.暗绿底 = 0x20 # dark green.暗青底 = 0x30 # dark skyblue.暗红底 = 0x40 # dark red.暗紫底 = 0x50 # dark pink.暗黄底 = 0x60 # dark yellow.暗白底 = 0x70 # dark white.灰底 = 0x80 # dark gray.蓝底 = 0x90 # blue.绿底 = 0xa0 # green.青底 = 0xb0 # skyblue.红底 = 0xc0 # red.紫底 = 0xd0 # pink.黄底 = 0xe0 # yellow.白底 = 0xf0 # white.std_out_handle = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)def set_cmd_text_color(color, handle=std_out_handle):Bool = ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)return Booldef resetColor():set_cmd_text_color(红字 | 绿字 | 蓝字)def 打印_黑(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黑字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(灰字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_绿(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_青(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_红(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_紫(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(紫字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_黄(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(白字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗蓝(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗蓝字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗绿(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗绿字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗青(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗青字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗红(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗红字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗紫(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗紫字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗黄(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗黄字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗白(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗白字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底黑字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黑字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底灰字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(灰字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底红字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底绿字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底黄字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底蓝字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底紫字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(紫字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底青字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底红字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底蓝字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底绿字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝底黄字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字 | 蓝底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝底白字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(白字 | 蓝底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底青字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()
elif os.name == 'posix':  # Linux'''格式: print('\033[显示方式;前景颜色;背景颜色m ..........\033[0m')print('\033[31;42m 123\033[0m')显示方式0 默认1 高亮显示4 下划线5 闪烁7 反白显示8 不可见颜色 前景色 背景色黑色     30     40红色     31     41绿色     32     42黄色     33     43蓝色     34     44紫色     35     45青色     36     46白色     37     47'''def 打印_灰(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;30;1m{TEXT}\033[0m")def 打印_红(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;31;1m{TEXT}\033[0m")def 打印_绿(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;32;1m{TEXT}\033[0m")def 打印_黄(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;33;1m{TEXT}\033[0m")def 打印_蓝(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;34;1m{TEXT}\033[0m")def 打印_紫(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;35;1m{TEXT}\033[0m")def 打印_青(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;36;1m{TEXT}\033[0m")def 打印_白(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;37;1m{TEXT}\033[0m")def 打印_白底黑字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;30;47m{TEXT}\033[0m")def 打印_白底红字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;31;47m{TEXT}\033[0m")def 打印_白底绿字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;32;47m{TEXT}\033[0m")def 打印_白底黄字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;33;47m{TEXT}\033[0m")def 打印_白底蓝字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;34;47m{TEXT}\033[0m")def 打印_白底紫字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;35;47m{TEXT}\033[0m")def 打印_白底青字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;36;47m{TEXT}\033[0m")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/876385.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

工业现场实测,焦化厂导烟车与装煤车风机实现无人作业

一、项目背景 作为我国重要的能源行业之一&#xff0c;焦化行业在国民经济中扮演着重要角色&#xff0c;焦化工艺是高温、高压、有毒物质等因素共同作用下进行的&#xff0c;因此存在着安全隐患&#xff0c;并伴有环境污染&#xff0c;改善焦化工艺的安全和环保问题是当前亟待…

Golang | Leetcode Golang题解之第287题寻找重复数

题目&#xff1a; 题解&#xff1a; func findDuplicate(nums []int) int {slow, fast : 0, 0for slow, fast nums[slow], nums[nums[fast]]; slow ! fast; slow, fast nums[slow], nums[nums[fast]] { }slow 0for slow ! fast {slow nums[slow]fast nums[fast]}return s…

分布式搜索引擎ES-DSL搜索详解

1.DSL搜索-入门语法 建立索引&#xff1a; xxx(自定义名称) 自定义mapping: POST /shop/_mapping {"properties": {"id": {"type": "long"},"age": {"type": "integer"},"username": {&quo…

Springboot 多数据源事务

起因 在一个service方法上使用的事务,其中有方法是调用的多数据源orderDB 但是多数据源没有生效,而是使用的primaryDB 原因 spring 事务实现的方式 以 Transactional 注解为例 (也可以看 TransactionTemplate&#xff0c; 这个流程更简单一点)。 入口&#xff1a;ProxyTransa…

Java语言程序设计——篇九(1)

&#x1f33f;&#x1f33f;&#x1f33f;跟随博主脚步&#xff0c;从这里开始→博主主页&#x1f33f;&#x1f33f;&#x1f33f; 内部类 概述内部类—内部类的分类成员内部类实战演练 局部内部类实战演练 匿名内部类实战演练 静态内部类实战演练 概述 内部类或嵌套类&#…

sheng的学习笔记-AI-公式-指数加权移动平均(EWMA)

AI目录&#xff1a;sheng的学习笔记-AI目录-CSDN博客 基础知识 指数加权移动平均&#xff08;Exponential Weighted Moving Average&#xff09;&#xff0c;是一种常用的序列处理方式 看例子&#xff0c;首先这是一年365天的温度散点图&#xff0c;以天数为横坐标&#xff0…

【学一点儿前端】本地或jenkins打包报错:getaddrinfo ENOTFOUND registry.nlark.com.

问题 今天jenkins打包一个项目&#xff0c;发现报错了 error An unexpected error occurred: "https://registry.nlark.com/xxxxxxxxxx.tgz: getaddrinfo ENOTFOUND registry.nlark.com". 先写解决方案 把yarn.lock文件里面的registry.nlark.com替换为registry.npmmi…

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第三篇 嵌入式Linux驱动开发篇-第五十八章 中断下文之tasklet

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…

全球性“微软蓝屏”事件及其对网络安全和系统稳定性的深远影响

近日&#xff0c;一次由微软视窗系统软件更新引发的全球性“微软蓝屏”事件&#xff0c;不仅成为科技领域的热点新闻&#xff0c;更是一次对全球IT基础设施韧性与安全性的深刻检验。这次事件源于美国电脑安全技术公司“众击”提供的一个带有“缺陷”的软件更新&#xff0c;它如…

女人内裤怎么洗才是最干净?内衣裤洗衣机怎么样?哪个牌子更好?

最近刚好用到一款比较好用的洗内衣裤洗衣机&#xff01;如果你也和我一样有洗内衣裤烦恼的&#xff0c;或者可以看看&#xff01; 内衣裤作为贴身穿的衣服&#xff0c;我是不会把它和外衣一起清洗的&#xff0c;而家里面的大洗衣机已经担起了清洗外衣的工作&#xff01; 朋友们…

JVM 内存分析工具 Memory Analyzer Tool(MAT)入门(一)

一、打开 jvisualvm &#xff08;VisualVM 是一款集成了 JDK 命令行工具和轻量级剖析功能的可视化工具。 设计用于开发和生产。&#xff09; 打开 jvisualvm.exe 工具会出现如下一些监控指标 二、VisualVM可以根据需要安装不同的插件&#xff0c;每个插件的关注点都不同&#x…

uniapp vue3 使用画布分享或者收藏功能

使用HBuilder X 开发小程序&#xff0c;大多数的画布插件很多都是vue2的写法&#xff0c;vue3的很少 我自己也试了很多个插件&#xff0c;但是有一些还是有问题&#xff0c;不好用 海报画板 - DCloud 插件市场 先将插件导入项目中 自己项目亲自用过&#xff0c;功能基本是完善…

谷粒商城-性能压测

1.压力测试 在项目上线前对其进行压力测试(以每个微服务为单元) 目的:找到系统能承载的最大负荷,找到其他测试方法更难发现的错误(两种类型:内存泄漏,并发与同步). 1.性能指标 响应时间(Response Time (RT)): 响应时间 指用户从客户端发起一个请求开始,到客户端接收到从服务…

【第五天】HTTPS和HTTP有哪些区别,HTTPS的工作原理

HTTPS和HTTP的区别&#xff1a; 1.安全性&#xff1a; HTTP是明文传输协议&#xff0c;数据在传输的过程中不加密&#xff0c;容易被窃听和篡改。HTTPS通过使用SSL或TLS协议对数据进行加密&#xff0c;确保传输的数据在网络上是安全的&#xff0c;不容易被窃取和篡改。 2.加…

leetcode-104. 二叉树的最大深度

题目描述 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3示例 2&#xff1a; 输入&#xff1a;root [1,n…

自动化测试 pytest 中 scope 限制 fixture使用范围!

导读 fixture 是 pytest 中一个非常重要的模块&#xff0c;可以让代码更加简洁。 fixture 的 autouse 为 True 可以自动化加载 fixture。 如果不想每条用例执行前都运行初始化方法(可能多个fixture)怎么办&#xff1f;可不可以只运行一次初始化方法&#xff1f; 答&#xf…

一招就能轻松解决猫咪浮毛?最新值得买的浮毛空气净化器汇总分享

那次逛街后去朋友家&#xff0c;她家猫哈基米特别热情&#xff0c;一开门就扑过来&#xff0c;朋友直接给了个大拥抱加亲亲。汗水和猫毛全粘身上了&#xff0c;看着都让人头皮痒。好多铲屎官都抱怨&#xff0c;就算天天梳毛&#xff0c;家里还是到处都是毛&#xff0c;毕竟家里…

理解文件系统(上)

模拟实现文件库 创建文件以便理解 自己想实现的文件接口&#xff0c;进行模拟实现 模拟的头文件要准备的头文件 open接口的实现 write接口的实现fflush接口的实现 flose接口的实现 文件实现 stdio.h stdio.c test.c makefile 创建makefile 编译运行 执行后输出log.txt,看…

【虚拟机】 VMware截图版详细安装教程

VMware-workstation-full-17.5.1-23298084 的安装&#xff0c;详细安装过程。 1.以管理员身份运行安装包 点击文件&#xff0c;右键打开&#xff0c;以管理员身份运行&#xff1b; 2.根据安装提示&#xff0c;重启电脑&#xff1b; &#xff08;重启与否看自己电脑情况&…

【深入理解SpringCloud微服务】深入理解Ribbon原理并手写一个微服务负载均衡器

深入理解Ribbon原理并手写一个微服务负载均衡器 负载均衡器理解Ribbon原理手写一个微服务负载均衡器总体设计LoadBalanceClientHttpRequestFactorySimpleLoadBalanceClientSimpleLoadBalancerLoadBalanceRulespring.factories与LoadBalanceConfig 负载均衡器 在微服务架构里面…