抓包统计分析DNS:解析用时、解析结果、解析状态
如图
#_*_ coding:utf8 _*_
## 网络质量分析,DNS解析慢、网页经常打开失败、慢等
## 抓包分析DNS:解析用时、解析结果、解析状态
# 2024-02-22
# Linux tcpdump 抓包示例:tcpdump -i em2 -s0 -G 600 -w %Y_%m%d_%H%M_%S.pcap 说明 在网卡em2上抓包,每600秒保存一个文件,文件名格式 年_月日_时分_秒.pcap
# Windows Wireshark 抓包:可在抓包前过滤 udp port 53 或不做过滤,抓包后另存为 pcap文件格式
import socket, struct, os, time, traceback
from binascii import b2a_hexdef 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):#时间文本格式 = '%Y-%m-%d %H:%M:%S'时间类 = time.localtime(时间戳)时间文本 = time.strftime(时间文本格式, 时间类)return(时间文本)def 时间文本_2_时间戳(时间文本, 时间文本格式='%Y-%m-%d %H:%M:%S'):## 时间文本 >>> 时间类 >>> 时间戳## 时间文本 = '2019-01-18 17:08:46'时间类 = time.strptime(时间文本, 时间文本格式)时间戳 = time.mktime(时间类)return(时间戳)## 终端显示颜色部分 ##
if os.name == 'nt': # Windowsimport ctypes,sysSTD_OUTPUT_HANDLE = -11# Windows CMD命令行 字体颜色定义 text colors黑字 = 0x00 # black.暗蓝字 = 0x01 # dark blue.暗绿字 = 0x02 # dark green.暗青字 = 0x03 # dark skyblue.暗红字 = 0x04 # dark red.暗紫字 = 0x05 # dark pink.暗黄字 = 0x06 # dark yellow.暗白字 = 0x07 # dark white.灰字 = 0x08 # dark gray.蓝字 = 0x09 # blue.绿字 = 0x0a # green.青字 = 0x0b # skyblue.红字 = 0x0c # red.紫字 = 0x0d # pink.黄字 = 0x0e # yellow.白字 = 0x0f # white.# Windows CMD命令行 背景颜色定义 background colors暗蓝底 = 0x10 # dark blue.暗绿底 = 0x20 # dark green.暗青底 = 0x30 # dark skyblue.暗红底 = 0x40 # dark red.暗紫底 = 0x50 # dark pink.暗黄底 = 0x60 # dark yellow.暗白底 = 0x70 # dark white.灰底 = 0x80 # dark gray.蓝底 = 0x90 # blue.绿底 = 0xa0 # green.青底 = 0xb0 # skyblue.红底 = 0xc0 # red.紫底 = 0xd0 # pink.黄底 = 0xe0 # yellow.白底 = 0xf0 # white.std_out_handle = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)def set_cmd_text_color(color, handle=std_out_handle):Bool = ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)return Booldef resetColor():set_cmd_text_color(红字 | 绿字 | 蓝字)def 打印_黑(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黑字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(灰字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_绿(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_青(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_红(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_紫(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(紫字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_黄(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(白字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗蓝(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗蓝字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗绿(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗绿字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗青(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗青字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗红(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗红字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗紫(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗紫字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗黄(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗黄字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_暗白(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(暗白字)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底黑字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黑字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底灰字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(灰字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底红字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底绿字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底黄字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底蓝字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底紫字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(紫字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_白底青字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字 | 白底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底红字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(红字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底蓝字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(蓝字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底绿字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(绿字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝底黄字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(黄字 | 蓝底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_蓝底白字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(白字 | 蓝底)sys.stdout.write(f"{TEXT}\n")resetColor()def 打印_灰底青字(TEXT, SHOW=1):if SHOW == 1:set_cmd_text_color(青字 | 灰底)sys.stdout.write(f"{TEXT}\n")resetColor()
elif os.name == 'posix': # Linux'''格式: print('\033[显示方式;前景颜色;背景颜色m ..........\033[0m')print('\033[31;42m 123\033[0m')显示方式0 默认1 高亮显示4 下划线5 闪烁7 反白显示8 不可见颜色 前景色 背景色黑色 30 40红色 31 41绿色 32 42黄色 33 43蓝色 34 44紫色 35 45青色 36 46白色 37 47'''def 打印_灰(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;30;1m{TEXT}\033[0m")def 打印_红(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;31;1m{TEXT}\033[0m")def 打印_绿(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;32;1m{TEXT}\033[0m")def 打印_黄(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;33;1m{TEXT}\033[0m")def 打印_蓝(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;34;1m{TEXT}\033[0m")def 打印_紫(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;35;1m{TEXT}\033[0m")def 打印_青(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;36;1m{TEXT}\033[0m")def 打印_白(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;37;1m{TEXT}\033[0m")def 打印_白底黑字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;30;47m{TEXT}\033[0m")def 打印_白底红字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;31;47m{TEXT}\033[0m")def 打印_白底绿字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;32;47m{TEXT}\033[0m")def 打印_白底黄字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;33;47m{TEXT}\033[0m")def 打印_白底蓝字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;34;47m{TEXT}\033[0m")def 打印_白底紫字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;35;47m{TEXT}\033[0m")def 打印_白底青字(TEXT, SHOW=1):if SHOW == 1:print(f"\033[0;36;47m{TEXT}\033[0m")## 网络数据包解析部分 #### UDP校验
def 计算校验和(DATA):LEN = len(DATA)if LEN % 2 == 0:FMT = '!' + str(LEN//2) + 'H'else:DATA += b'\x00'LEN = len(DATA)FMT = '!' + str(LEN//2) + 'H'X = struct.unpack(FMT, DATA)SUM = 0for i in X:SUM += iwhile(SUM > 65535): ## 值大于 65535 说明二进制位数超过16bitH16 = SUM >> 16 ## 取高16位L16 = SUM & 0xffff ## 取低16位SUM = H16 + L16校验和 = SUM ^ 0xffff#打印_灰(f"计算 TCP_Checksum {hex(校验和)}")return(校验和)## 解析包头(16字节)
def Packet_Header(BytesData):PacketHeader = struct.unpack('IIII', BytesData)时间戳 = PacketHeader[0]微秒 = PacketHeader[1]抓取数据包长度 = PacketHeader[2] # 所抓获的数据包保存在pcap文件中的实际长度,以字节为单位。实际数据包长度 = PacketHeader[3] # 所抓获的数据包的真实长度,如果文件中保存不是完整的数据包,那么这个值可能要比前面的数据包长度的值大。#return(时间戳, 微秒, 抓取数据包长度, 实际数据包长度)时间戳_float = 时间戳 + 微秒/1000000return(时间戳_float, 抓取数据包长度, 实际数据包长度)## 解析帧头(14字节)
def EthernetII_Header(BytesData):DstMAC = BytesData[0:6] # 目的MAC地址SrcMAC = BytesData[6:12] # 源MAC地址FrameType = BytesData[12:14] # 帧类型return(DstMAC, SrcMAC, FrameType)## 解析IPv4头(20字节)
def IPv4_Header(BytesData):IP_B, IP_TOS, IP_Total_Length, IP_Identification, IP_H, IP_TTL, IP_Protocol, IP_Header_checksum, IP_Source, IP_Destination = struct.unpack('!BBHHHBBHII', BytesData) # BIP_Version = IP_B >> 4 # 取1字节的前4位IP_Header_Length = IP_B & 0xF # 取1字节的后4位IP_Flags = IP_H >> 13IP_Fragment_offset = IP_H & 0b0001111111111111return(IP_Version, IP_Header_Length, IP_TOS, IP_Total_Length, IP_Identification, IP_Flags, IP_Fragment_offset, IP_TTL, IP_Protocol, IP_Header_checksum, IP_Source, IP_Destination)## 解析IPv6头(40字节) 版本(Version)4位+传输类型(Traffic Class)8位+流标签(Flow Label)20位+负载长度(Payload Length)16位+下一个包头(Next Header)8位+跳数限制(Hop Limit)8位+源地址(Source Address)128位+目的地址(Destination Address)128位
def IPv6_Header(BytesData):IP_Version = BytesData[0]>>4 # 取1字节的前4位Payload_Length = struct.unpack('!H', BytesData[4:6])[0]Next_Header = BytesData[6]Hop_Limit = BytesData[7]Source_Address = BytesData[8:24]Destination_Address = BytesData[24:40]SrcIP = socket.inet_ntop(socket.AF_INET6, Source_Address)DstIP = socket.inet_ntop(socket.AF_INET6, Destination_Address)return(IP_Version, Payload_Length, Next_Header, Hop_Limit, SrcIP, DstIP)## 解析UDP头(8字节)
def UDP_Header(BytesData):SrcPort_Bytes = BytesData[0:2]DstPort_Bytes = BytesData[2:4]SrcPort = struct.unpack('>H', SrcPort_Bytes)[0]DstPort = struct.unpack('>H', DstPort_Bytes)[0]#print(f"SrcPort={SrcPort}")#print(f"DstPort={DstPort}")包长度 = struct.unpack('>H', BytesData[4:6])[0]校验值 = BytesData[6:8]return(SrcPort, DstPort, 包长度, 校验值)def DNS_标志解析(BytesData):第一字节 = BytesData[0]第二字节 = BytesData[1]QR = 第一字节>>7 # 1 1bit 10000000 取1字节的前1位操作码 = (第一字节&120)>>3 # 2-5 4bit 01111000 0b01111000=120 操作码 OpcodeAA = (第一字节&4)>>2 # 6 1bit 00000100TC = (第一字节&2)>>1 # 7 1bit 00000010RD = 第一字节&1 # 8 1bit 00000001RA = 第二字节>>7 # 1 1bit 10000000Z = (第二字节&64)>>6 # 2 1bit 01000000AD = (第二字节&32)>>5 # 3 1bit 00100000CD = (第二字节&16)>>4 # 4 1bit 00010000响应码 = 第二字节&0xF # 5-8 4bit 00001111 取1字节的后4位 响应码 Rcodereturn(QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码)def 域名数据格式解析(普通数据, BytesData_UDP_DATA):IDX = 0Bytes_NAME = b'' # 域名( Name )while 1:LEN = 普通数据[IDX] # 开头1字节表示后续长度if LEN == 0: # 开头字节==0则表示根域名.IDX += 1breakelif 普通数据[IDX]&192 == 192: # 第一个字节高2位是11表示缩写,占用2字节 0b11000000=192偏移长度 = struct.unpack('>H', 普通数据[IDX:IDX+2])[0]&16383 # 0b0011111111111111 = 16383Bytes_NAME_缩写_解析结果 = 缩写域名格式解析(BytesData_UDP_DATA, 偏移长度)Bytes_NAME += Bytes_NAME_缩写_解析结果IDX += 2breakIDX_END = IDX+1+LEN # 要拼接的长度Bytes_NAME += 普通数据[IDX+1:IDX_END]+b'.' # 拼接域名IDX = IDX_ENDreturn(Bytes_NAME)def 查询域名格式解析(BytesData, IDX):Bytes_NAME = b'' # 域名( Name )while 1:LEN = BytesData[IDX] # 开头1字节表示后续长度if LEN == 0: # 开头字节==0则表示根域名.IDX += 1breakelif BytesData[IDX]&192 == 192: # 第一个字节高2位是11表示缩写,占用2字节 0b11000000=192偏移长度 = struct.unpack('>H', BytesData[IDX:IDX+2])[0]&16383 # 0b0011111111111111 = 16383Bytes_NAME_缩写_解析结果 = 缩写域名格式解析(BytesData, 偏移长度)#Bytes_NAME_缩写_解析结果 = BytesData[IDX:IDX+2] # 不解析,返回2字节缩写原值Bytes_NAME += Bytes_NAME_缩写_解析结果IDX += 2breakIDX_END = IDX+1+LEN # 要拼接的长度Bytes_NAME += BytesData[IDX+1:IDX_END]+b'.' # 拼接域名IDX = IDX_ENDreturn(IDX, Bytes_NAME)def 缩写域名格式解析(BytesData_UDP_DATA, 偏移长度):#打印_紫(f" DEF 缩写域名格式解析 偏移长度={偏移长度} len(BytesData_UDP_DATA)={len(BytesData_UDP_DATA)} b2a_hex[{偏移长度}:]={b2a_hex(BytesData_UDP_DATA[偏移长度:])}")Bytes_NAME_缩写 = b'' # 域名( Name )IDX_缩写 = 偏移长度while 1:LEN_缩写 = BytesData_UDP_DATA[IDX_缩写]if LEN_缩写 == 0:IDX_缩写 += 1breakelif BytesData_UDP_DATA[IDX_缩写]&192 == 192: # 第一个字节高2位是11表示缩写,占用2字节 0b11000000=192偏移长度2 = struct.unpack('>H', BytesData_UDP_DATA[IDX_缩写:IDX_缩写+2])[0]&16383 # 0b0011111111111111 = 16383#打印_红(f"又缩写 偏移长度2={偏移长度2}")Bytes_NAME_缩写_解析结果 = 缩写域名格式解析(BytesData_UDP_DATA, 偏移长度2)Bytes_NAME_缩写 += Bytes_NAME_缩写_解析结果IDX_缩写 += 2breakIDX_END_缩写 = IDX_缩写+1+LEN_缩写Bytes_NAME_缩写 += BytesData_UDP_DATA[IDX_缩写+1:IDX_END_缩写]+b'.'#print(f" Bytes_NAME_缩写={Bytes_NAME_缩写} += BytesData_UDP_DATA[IDX_缩写+1:IDX_END_缩写]+b'.'={BytesData_UDP_DATA[IDX_缩写+1:IDX_END_缩写]+b'.'}")IDX_缩写 = IDX_END_缩写return(Bytes_NAME_缩写)def 解析DNS数据内容(BytesData, IDX, 记录数, 备注说明):L_DATA = []for i in range(0, 记录数):## 资源记录偏移长度 = 0#print(f" 响应 {i} 剩余总长 {len(BytesData[IDX:])}")if BytesData[IDX] == 0: # 顶级域名固定0Bytes_NAME = BytesData[IDX]IDX += 1#打印_灰(f" 顶级域名 0 Bytes_NAME={Bytes_NAME}")elif BytesData[IDX]&192 == 192: # 第一个字节高2位是11表示缩写,占用2字节 0b11000000=192两字节缩写 = BytesData[IDX:IDX+2]IDX += 2偏移长度 = struct.unpack('>H', 两字节缩写)[0]&16383 # 0b0011111111111111 = 16383#打印_灰(f" 缩写域名(2字节) Bytes_NAME={Bytes_NAME} b2a_hex={b2a_hex(Bytes_NAME)} 偏移长度={偏移长度}")Bytes_NAME = 缩写域名格式解析(BytesData, 偏移长度)else:Bytes_NAME = b'' # 域名( Name )while 1:LEN = BytesData[IDX]if LEN == 0:IDX += 1breakIDX_END = IDX+1+LENBytes_NAME += BytesData[IDX+1:IDX_END]+b'.'IDX = IDX_END#打印_灰(f" 记录域名 Bytes_NAME={Bytes_NAME} b2a_hex={b2a_hex(Bytes_NAME)}")TYPE = struct.unpack('>H', BytesData[IDX:IDX+2])[0]CLASS = struct.unpack('>H', BytesData[IDX+2:IDX+4])[0]TTL = struct.unpack('>I', BytesData[IDX+4:IDX+8])[0]DATA_LEN = struct.unpack('>H', BytesData[IDX+8:IDX+10])[0]DATA = BytesData[IDX+10:IDX+10+DATA_LEN]if TYPE == 1: # Type: A (Host Address) (1)## DATA_LEN == 4:L_DATA.append(('A', socket.inet_ntoa(DATA)))DATA_TEXT = f"A Bytes_NAME={Bytes_NAME} IPv4 {socket.inet_ntoa(DATA)}"elif TYPE == 2: # Type: NS (authoritative Name Server) (2)NAME_SERVER = 域名数据格式解析(DATA, BytesData)L_DATA.append(('NS', Bytes_NAME, NAME_SERVER))DATA_TEXT = f"NS 域名数据格式解析 Bytes_NAME={Bytes_NAME} NAME_SERVER={NAME_SERVER}"elif TYPE == 5: # Type: CNAME (Canonical NAME for an alias) (5)CNAME = 域名数据格式解析(DATA, BytesData)L_DATA.append(('CNAME', Bytes_NAME, CNAME))DATA_TEXT = f"CNAME 域名数据格式解析 Bytes_NAME={Bytes_NAME} CNAME={CNAME}"elif TYPE == 6: # Type: SOA (Start Of a zone of Authority) (6)L_DATA.append(('SOA', Bytes_NAME, DATA))DATA_TEXT = f"SOA Bytes_NAME={Bytes_NAME} 暂未处理"elif TYPE == 12: # Type: PTR (domain name PoinTeR) (12)L_DATA.append(('PTR', Bytes_NAME, DATA))pass # 21005elif TYPE == 28: # Type: AAAA (IPv6 Address) (28)## DATA_LEN == 16:L_DATA.append(('AAAA', Bytes_NAME, socket.inet_ntop(socket.AF_INET6, DATA)))DATA_TEXT = f"AAAA Bytes_NAME={Bytes_NAME} IPv6 {b2a_hex(DATA).decode('UTF-8')} {socket.inet_ntop(socket.AF_INET6, DATA)}"elif TYPE == 41: # Type: OPT (41)L_DATA.append(('OPT ROOT', Bytes_NAME, DATA))DATA_TEXT = f"OPT ROOT {b2a_hex(DATA).decode('UTF-8')}"elif TYPE == 65: # Type: HTTPS (HTTPS Specific Service Endpoints) (65)L_DATA.append(('HTTPS', Bytes_NAME, DATA))pass #7015else:L_DATA.append(('未解析', Bytes_NAME, DATA))DATA_TEXT = f"Bytes_NAME={Bytes_NAME} 未解析"IDX += 10+DATA_LEN###打印_灰(f"{备注说明} {i+1:2} TYPE={TYPE:3} CLASS={CLASS} TTL={TTL:5} DATA_LEN={DATA_LEN} DATA_TEXT={DATA_TEXT}")return(IDX, L_DATA)## 解析UDP数据内容
def UDP_Data(BytesData, UDP_DATA_TYPE):if UDP_DATA_TYPE == 'DNS':标识 = struct.unpack('>H', BytesData[0:2])[0] # identifier 2字节的ID,标识请求和对应的响应QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码 = DNS_标志解析(BytesData[2:4]) # flags 标志:DNS操作信息问题记录数 = struct.unpack('>H', BytesData[4:6])[0] # 问题记录数 question count 无符号16位整数表示报文请求段中的问题记录数。答案记录数 = struct.unpack('>H', BytesData[6:8])[0] # 答案记录数 answer count 无符号16位整数表示报文回答段中的回答记录数。授权信息记录数 = struct.unpack('>H', BytesData[8:10])[0] # 授权信息记录数 authority record count 无符号16位整数表示报文授权段中的授权记录数。附加信息记录数 = struct.unpack('>H', BytesData[10:12])[0] # 附加信息记录数 additional record count 无符号16位整数表示报文附加段中的附加记录数。L_问题记录 = []L_答案记录 = []L_授权信息记录 = []L_附加信息记录 = []IDX = 12## DNS 问题/请求/查询#打印_灰(f"问题/请求/查询 b2a_hex={b2a_hex(BytesData[IDX:])} 剩余总长 {len(BytesData[IDX:])}")for i in range(0, 问题记录数):## 解析点分域名格式 问题记录#print(f"请求 {i} b2a_hex={b2a_hex(BytesData[IDX:])}")IDX, Bytes_NAME = 查询域名格式解析(BytesData, IDX)'''Bytes_NAME = b'' # 待查询域名( Name )while 1:LEN = BytesData[IDX]if LEN == 0:IDX += 1breakIDX_END = IDX+1+LENBytes_NAME += BytesData[IDX+1:IDX_END]+b'.'IDX = IDX_END'''TYPE = struct.unpack('>H', BytesData[IDX:IDX+2])[0] # 查询类型( Type ):1 表示 A 记录,即 IP 地址;28 表示 AAAA 记录,即 IPv6 地址CLASS = struct.unpack('>H', BytesData[IDX+2:IDX+4])[0] # 类 ( Class )通常为 1 ,表示 TCP/IP 互联网地址IDX += 4L_问题记录.append(Bytes_NAME)###打印_青(f"请求查询域名 {i:2} TYPE={TYPE:3} CLASS={CLASS} Bytes_NAME={Bytes_NAME}")## DNS 答案/应答#打印_灰(f"答案/应答 b2a_hex={b2a_hex(BytesData[IDX:])} 剩余总长 {len(BytesData[IDX:])}")IDX, L_答案记录 = 解析DNS数据内容(BytesData, IDX, 答案记录数, '域名解析地址')## DNS 授权信息记录#打印_灰(f"授权信息记录 b2a_hex={b2a_hex(BytesData[IDX:])} 剩余总长 {len(BytesData[IDX:])}")IDX, L_授权信息记录 = 解析DNS数据内容(BytesData, IDX, 授权信息记录数, '授权信息记录')## DNS 附加信息记录#打印_灰(f"附加信息记录 {i} b2a_hex={b2a_hex(BytesData[IDX:])} 剩余总长 {len(BytesData[IDX:])}")IDX, L_附加信息记录 = 解析DNS数据内容(BytesData, IDX, 附加信息记录数, '附加信息记录')## 全部数据解析完成后不应该有剩余数据if len(BytesData[IDX:]) != 0:打印_红(f"剩余 {len(BytesData[IDX:])} b2a_hex={b2a_hex(BytesData[IDX:])}")return(标识, QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码, 问题记录数,答案记录数,授权信息记录数,附加信息记录数, L_问题记录,L_答案记录,L_授权信息记录,L_附加信息记录)else:return(len(BytesData))def IPv4_UDP_DNS(UDP_BytesData, D_IPv4_UDP_DNS, D_PCAP, SrcIP, DstIP, SrcPort, DstPort, 包长度, 校验值, 校验和, UDP_校验结果, 包编号, 时间戳, D_DEBUG, D_DNS_FLAGS):## 每一次UDP发数据生成2个KeyUDP_KEY_SD = (SrcIP,SrcPort,DstIP,DstPort) # 以当前包的源到目的生成一个KeyUDP_KEY_DS = (DstIP,DstPort,SrcIP,SrcPort) # 以当前包的目的到源生成一个Key#打印_绿(f"UDP_KEY_SD={UDP_KEY_SD}")#打印_蓝(f"UDP_KEY_DS={UDP_KEY_DS}")UDP_DATA_TYPE = 'DNS'try:标识, QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码, 问题记录数,答案记录数,授权信息记录数,附加信息记录数, L_问题记录,L_结果地址,L_授权信息记录,L_附加信息记录 = UDP_Data(UDP_BytesData, UDP_DATA_TYPE)TEXT_DNS_INFO = f"{标识:5} {D_DNS_FLAGS['QR'][QR]} {D_DNS_FLAGS['操作码'][操作码]} {D_DNS_FLAGS['AA'][AA]} {D_DNS_FLAGS['TC'][TC]} {D_DNS_FLAGS['RD'][RD]} {D_DNS_FLAGS['RA'][RA]} {D_DNS_FLAGS['Z'][Z]} {D_DNS_FLAGS['AD'][AD]} {D_DNS_FLAGS['CD'][CD]} {D_DNS_FLAGS['响应码'][响应码]} {问题记录数:4} {答案记录数:4} {授权信息记录数:4} {附加信息记录数:4}"TEXT = f"{包编号:6} {时间戳:17.6f} {SrcIP:15s} {DstIP:15s} {SrcPort:5} {DstPort:5} {包长度:4} {b2a_hex(校验值)} {hex(校验和):6s} {UDP_校验结果} {TEXT_DNS_INFO} {L_问题记录} {L_结果地址}"except Exception as e:D_PCAP['UDP_DNS_IPv4_ERROR'].append(包编号)if D_DEBUG['DEBUG'] == 'P':打印_红(f"{包编号:6} {时间戳:17.6f} {SrcIP:15s} {DstIP:15s} {SrcPort:5} {DstPort:5} {包长度:4} {b2a_hex(校验值)} ERROR DNS数据解析失败")if D_DEBUG['解析失败详情'] == 'P':打印_红(f"ERROR 解析失败详情:\n{traceback.format_exc()}")else:## 保存记录文件if D_DEBUG['包基本信息IPv4'].strip() != '' and D_DEBUG['包基本信息IPv4'] != 'P':with open(D_DEBUG['包基本信息IPv4'], 'a', encoding='utf-8') as f:f.write(TEXT+'\n')## 颜色打印查看if D_DEBUG['DEBUG'] == 'P':if QR == 0:if UDP_校验结果 == '校验成功':if D_DEBUG['包基本信息IPv4'] == 'P':打印_绿(TEXT)if D_DEBUG['问题记录'] == 'P' and L_问题记录 != []:打印_灰(f"{包编号:6} 问题记录 {L_问题记录}")if D_DEBUG['结果地址'] == 'P' and L_结果地址 != []:打印_灰(f"{包编号:6} 结果地址 {L_结果地址}")if D_DEBUG['授权信息记录'] == 'P' and L_授权信息记录 != []:打印_灰(f"{包编号:6} 授权信息记录 {L_授权信息记录}")if D_DEBUG['附加信息记录'] == 'P' and L_附加信息记录 != []:打印_灰(f"{包编号:6} 附加信息记录 {L_附加信息记录}")else:if D_DEBUG['包基本信息IPv4'] == 'P':打印_灰底绿字(TEXT)if D_DEBUG['问题记录'] == 'P' and L_问题记录 != []:打印_灰(f"{包编号:6} 问题记录 {L_问题记录}")if D_DEBUG['结果地址'] == 'P' and L_结果地址 != []:打印_灰(f"{包编号:6} 结果地址 {L_结果地址}")if D_DEBUG['授权信息记录'] == 'P' and L_授权信息记录 != []:打印_灰(f"{包编号:6} 授权信息记录 {L_授权信息记录}")if D_DEBUG['附加信息记录'] == 'P' and L_附加信息记录 != []:打印_灰(f"{包编号:6} 附加信息记录 {L_附加信息记录}")else:if UDP_校验结果 == '校验成功':if D_DEBUG['包基本信息IPv4'] == 'P':打印_青(TEXT)if D_DEBUG['问题记录'] == 'P' and L_问题记录 != []:打印_灰(f"{包编号:6} 问题记录 {L_问题记录}")if D_DEBUG['结果地址'] == 'P' and L_结果地址 != []:打印_灰(f"{包编号:6} 结果地址 {L_结果地址}")if D_DEBUG['授权信息记录'] == 'P' and L_授权信息记录 != []:打印_灰(f"{包编号:6} 授权信息记录 {L_授权信息记录}")if D_DEBUG['附加信息记录'] == 'P' and L_附加信息记录 != []:打印_灰(f"{包编号:6} 附加信息记录 {L_附加信息记录}")else:if D_DEBUG['包基本信息IPv4'] == 'P':打印_灰底青字(TEXT)if D_DEBUG['问题记录'] == 'P' and L_问题记录 != []:打印_灰(f"{包编号:6} 问题记录 {L_问题记录}")if D_DEBUG['结果地址'] == 'P' and L_结果地址 != []:打印_灰(f"{包编号:6} 结果地址 {L_结果地址}")if D_DEBUG['授权信息记录'] == 'P' and L_授权信息记录 != []:打印_灰(f"{包编号:6} 授权信息记录 {L_授权信息记录}")if D_DEBUG['附加信息记录'] == 'P' and L_附加信息记录 != []:打印_灰(f"{包编号:6} 附加信息记录 {L_附加信息记录}")## 保存返回内容DNS_INFO = (包编号,时间戳,包长度,UDP_校验结果, 标识, QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码, 问题记录数,答案记录数,授权信息记录数,附加信息记录数, L_问题记录,L_结果地址,L_授权信息记录,L_附加信息记录)if UDP_KEY_SD in D_IPv4_UDP_DNS:D_IPv4_UDP_DNS[UDP_KEY_SD].append(DNS_INFO)elif UDP_KEY_DS in D_IPv4_UDP_DNS:D_IPv4_UDP_DNS[UDP_KEY_DS].append(DNS_INFO)else:D_IPv4_UDP_DNS[UDP_KEY_SD] = [DNS_INFO]def IPv6_UDP_DNS(UDP_BytesData, D_IPv6_UDP_DNS, D_PCAP, SrcIP, DstIP, SrcPort, DstPort, 包长度, 校验值, 包编号, 时间戳, D_DEBUG, D_DNS_FLAGS):## 每一次UDP发数据生成2个KeyUDP_KEY_SD = (SrcIP,SrcPort,DstIP,DstPort) # 以当前包的源到目的生成一个KeyUDP_KEY_DS = (DstIP,DstPort,SrcIP,SrcPort) # 以当前包的目的到源生成一个KeyUDP_校验结果 = '--------'UDP_DATA_TYPE = 'DNS'try:标识, QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码, 问题记录数,答案记录数,授权信息记录数,附加信息记录数, L_问题记录,L_结果地址,L_授权信息记录,L_附加信息记录 = UDP_Data(UDP_BytesData, UDP_DATA_TYPE)TEXT_DNS_INFO = f"{标识:5} {D_DNS_FLAGS['QR'][QR]} {D_DNS_FLAGS['操作码'][操作码]} {D_DNS_FLAGS['AA'][AA]} {D_DNS_FLAGS['TC'][TC]} {D_DNS_FLAGS['RD'][RD]} {D_DNS_FLAGS['RA'][RA]} {D_DNS_FLAGS['Z'][Z]} {D_DNS_FLAGS['AD'][AD]} {D_DNS_FLAGS['CD'][CD]} {D_DNS_FLAGS['响应码'][响应码]} {问题记录数:4} {答案记录数:4} {授权信息记录数:4} {附加信息记录数:4}"TEXT = f"{包编号:6} {时间戳:17.6f} {SrcIP[:15]:15s} {DstIP[:15]:15s} {SrcPort:5} {DstPort:5} {包长度:4} {b2a_hex(校验值)} {'未计算'} {UDP_校验结果} {TEXT_DNS_INFO} {L_问题记录} {L_结果地址}"except Exception as e:D_PCAP['UDP_DNS_IPv6_ERROR'].append(包编号)if D_DEBUG['DEBUG'] == 'P':打印_红(f"{包编号:6} {时间戳:17.6f} {SrcIP:15s} {DstIP:15s} {SrcPort:5} {DstPort:5} {包长度:4} {b2a_hex(校验值)} ERROR DNS数据解析失败")if D_DEBUG['解析失败详情'] == 'P':打印_红(f"ERROR 解析失败详情:\n{traceback.format_exc()}")else:## 保存返回内容DNS_INFO = (包编号,时间戳,包长度,UDP_校验结果, 标识, QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码, 问题记录数,答案记录数,授权信息记录数,附加信息记录数, L_问题记录,L_结果地址,L_授权信息记录,L_附加信息记录)if UDP_KEY_SD in D_IPv6_UDP_DNS:D_IPv6_UDP_DNS[UDP_KEY_SD].append(DNS_INFO)elif UDP_KEY_DS in D_IPv6_UDP_DNS:D_IPv6_UDP_DNS[UDP_KEY_DS].append(DNS_INFO)else:D_IPv6_UDP_DNS[UDP_KEY_SD] = [DNS_INFO]## 保存记录文件if D_DEBUG['包基本信息IPv6'].strip() != '' and D_DEBUG['包基本信息IPv6'] != 'P':with open(D_DEBUG['包基本信息IPv6'], 'a', encoding='utf-8') as f:f.write(TEXT+'\n')## 颜色打印查看if D_DEBUG['DEBUG'] == 'P':if QR == 0:if D_DEBUG['包基本信息IPv6'] == 'P':打印_绿(TEXT)else:if D_DEBUG['包基本信息IPv6'] == 'P':打印_青(TEXT)## 解析PCAP文件
## 返回字典 D_PCAP {'TCP':数量, 'UDP':数量, }
## 返回字典 D_IPv4_UDP_DNS
## 返回字典 D_IPv6_UDP_DNS
def 解析PCAP文件中的UDP(L_PATH_PCAP_FILE, P_解析指定包编号, D_DNS_FLAGS, D_DEBUG, P_SELECT_PORT, P_SELECT_IP, P_DNS_PORT):D_PCAP = {'IEEE 802.3':0, 'IPv4':0, 'IPv6':0, 'VLAN':0, 'TCP':0, 'UDP':0, 'OTHER_IP_Protocol':{}, 'OTHER_FrameType':{}, 'UDP_DNS_IPv4_ERROR':[], 'UDP_DNS_IPv6_ERROR':[], 'UDP校验成功':0, 'UDP校验失败':0, '忽略_包编号_包数量':0, '忽略_地址_包数量':0, '忽略_端口_包数量':0}D_IPv4_UDP_DNS = {} # 键=UDP_KEY_SD/UDP_KEY_DS 值=[(),]D_IPv6_UDP_DNS = {}包编号 = 0for PATH_PCAP_FILE in L_PATH_PCAP_FILE:f = open(PATH_PCAP_FILE, 'rb') # 以二进制方式读取pcap格式文件PCAP_DATA = f.read(24) # 读取前24字节头信息,忽略while 1:包头 = f.read(16)if not 包头: # 判断 包头 是否为空(读完或者本身为空时 S 为空)break包编号 += 1#打印_黄(f"包编号={包编号}")时间戳, 抓取数据包长度, 实际数据包长度 = Packet_Header(包头)#抓取数据包长度 = PacketHeader[2] # 所抓获的数据包保存在pcap文件中的实际长度,以字节为单位。#实际数据包长度 = PacketHeader[3] # 所抓获的数据包的真实长度,如果文件中保存不是完整的数据包,那么这个值可能要比前面的数据包长度的值大。PacketData = f.read(实际数据包长度)## 跳过包if P_解析指定包编号 != set():if 包编号 not in P_解析指定包编号:D_PCAP['忽略_包编号_包数量'] += 1continue###########【EthernetII】########### 以太部首DstMAC, SrcMAC, FrameType = EthernetII_Header(PacketData[0:14]) # 以太帧头#print(DstMAC, SrcMAC, FrameType, b2a_hex(FrameType))if FrameType == b'\x81\x00': # VLAN包,去掉VLAN数据后继续解析D_PCAP['VLAN'] += 1FrameType = PacketData[16:18] # 去除VLAN后的以太帧类型PacketData = PacketData[0:12] + PacketData[16:] ## 剔除VLAN后的数据包if FrameType <= b'\x06\x00': # IEEE 802.3 Ethernet 是将Ethernet V2帧头的协议类型字段替换为帧长度字段(取值为0000-05dc;十进制的0-1500) 网上资料让用0600H作为分界D_PCAP['IEEE 802.3'] += 1continueelse:if FrameType == b'\x08\x00': # Type: IPv4 (0x0800)D_PCAP['IPv4'] += 1## 解析IPv4头(20字节[14:34])(IP_Version, IP_Header_Length, IP_TOS, IP_Total_Length, IP_Identification, IP_Flags, IP_Fragment_offset, IP_TTL, IP_Protocol, IP_Header_checksum, IP_Source, IP_Destination) = IPv4_Header(PacketData[14:34])if IP_Protocol == 6: ## 协议 b'\x06' Protocol: TCP (6)D_PCAP['TCP'] += 1elif IP_Protocol == 17: ## 协议 b'\x11' Protocol: UDP (17)D_PCAP['UDP'] += 1SrcIP_Bytes = PacketData[26:30] # 源IP地址DstIP_Bytes = PacketData[30:34] # 目的IP地址SrcIP = socket.inet_ntoa(SrcIP_Bytes)DstIP = socket.inet_ntoa(DstIP_Bytes)## 解析UDP头(8字节)SrcPort, DstPort, 包长度, 校验值 = UDP_Header(PacketData[34:42])## 筛选出指定端口号(不是想要的端口号就跳过后续解析)if len(P_SELECT_PORT) != 0:P_PORT = set((SrcPort, DstPort))if P_PORT & P_SELECT_PORT == set():D_PCAP['忽略_端口_包数量'] += 1continue## 筛选出指定IP(不是想要的IP就跳过后续解析)if len(P_SELECT_IP) != 0:P_IP = set((SrcIP, DstIP))if P_SELECT_IP & P_IP == set():D_PCAP['忽略_地址_包数量'] += 1continue## UDP伪部首及计算校验和UDP伪部首 = SrcIP_Bytes+DstIP_Bytes+b'\x00'+struct.pack('!BH', IP_Protocol, 包长度) # 构造UDP伪部首DATA_校验和计算 = UDP伪部首+PacketData[34:40]+b'\x00\x00'+PacketData[42:]校验和 = 计算校验和(DATA_校验和计算) # 校验值=b'\x19\xc4'(<class 'bytes'>) 校验和=6596(<class 'int'>)/hex(校验和)=0x19c4if 校验和 == struct.unpack('!H',校验值)[0]:UDP_校验结果 = '校验成功'D_PCAP['UDP校验成功'] +=1else:UDP_校验结果 = '校验失败'D_PCAP['UDP校验失败'] +=1## 遇到DNS端口再进行DNS内容解析if SrcPort in P_DNS_PORT or DstPort in P_DNS_PORT:IPv4_UDP_DNS(PacketData[42:], D_IPv4_UDP_DNS, D_PCAP, SrcIP, DstIP, SrcPort, DstPort, 包长度, 校验值, 校验和, UDP_校验结果, 包编号, 时间戳, D_DEBUG, D_DNS_FLAGS)else:#打印_红(f"{包编号:6} OTHER_IP_Protocol={IP_Protocol}")if IP_Protocol in D_PCAP['OTHER_IP_Protocol']:D_PCAP['OTHER_IP_Protocol'][IP_Protocol] += 1else:D_PCAP['OTHER_IP_Protocol'][IP_Protocol] = 1elif FrameType == b'\x86\xdd': # Type: IPv6 (0x86dd)D_PCAP['IPv6'] += 1IP_Version, Payload_Length, Next_Header, Hop_Limit, SrcIP, DstIP = IPv6_Header(PacketData[14:54])#打印_紫(f" IP_Version={IP_Version} Payload_Length={Payload_Length} Next_Header={Next_Header} Hop_Limit={Hop_Limit} SrcIP={SrcIP} DstIP={DstIP}")## 解析UDP头(8字节)SrcPort, DstPort, 包长度, 校验值 = UDP_Header(PacketData[54:62])#打印_紫(f" SrcPort={SrcPort} DstPort={DstPort} 包长度={包长度} 校验值={校验值}")if Next_Header == 17: # UDP (17)## 遇到DNS端口再进行DNS内容解析if SrcPort in P_DNS_PORT or DstPort in P_DNS_PORT:IPv6_UDP_DNS(PacketData[62:], D_IPv6_UDP_DNS, D_PCAP, SrcIP, DstIP, SrcPort, DstPort, 包长度, 校验值, 包编号, 时间戳, D_DEBUG, D_DNS_FLAGS)else:#打印_红(f"{包编号:6} OTHER_FrameType={FrameType}")if FrameType not in D_PCAP['OTHER_FrameType']:D_PCAP['OTHER_FrameType'][FrameType] = 0D_PCAP['OTHER_FrameType'][FrameType] += 1f.close()return(D_PCAP, D_IPv4_UDP_DNS, D_IPv6_UDP_DNS)## 分析部分 ##def 遍历目录提取文件名_带路径(DIR, 指定扩展名=''):L_FILE_NAME_PATH = []for root, dirs, files in os.walk(DIR, topdown=False):if 指定扩展名 == '':L_FILE_NAME_PATH += [os.path.join(root, name) for name in files]else:指定扩展名 = 指定扩展名.lower()L_FILE_NAME_PATH += [os.path.join(root, name) for name in files if name.split('.')[-1].lower()==指定扩展名]return(L_FILE_NAME_PATH)def 生成处理文件列表(PCAP_File_OR_Dir, 指定扩展名=''):L_PCAP = []if os.path.exists(PCAP_File_OR_Dir): # 判断 目录、文件 是否存在if os.path.isfile(PCAP_File_OR_Dir):L_PCAP.append(PCAP_File_OR_Dir)elif os.path.isdir(PCAP_File_OR_Dir):L_FileName = 遍历目录提取文件名_带路径(PCAP_File_OR_Dir, 指定扩展名) # 目录内文件列表(只取文件)L_FileName.sort()L_PCAP += L_FileNameelse:打印_红(f"PCAP_File_OR_Dir={PCAP_File_OR_Dir} 不存在")return(L_PCAP)def 初始化目录(DIR_WORK, D_DEBUG):## 初始化工作目录if DIR_WORK.replace(' ', '') == '':DIR_WORK = os.getcwd() # 使用当前目录为主目录else:if os.path.exists(DIR_WORK):if os.path.isdir(DIR_WORK):print(f"已经存在目录DIR_WORK={DIR_WORK}")else:print(f"已经存在文件DIR_WORK={DIR_WORK}")DIR_WORK = os.getcwd() # 使用当前目录为主目录else:print(f"新建根目录 DIR_WORK={DIR_WORK}")try:os.makedirs(DIR_WORK)except:DIR_WORK = os.getcwd() # 使用当前目录为主目录print(f"设置主目录为:{DIR_WORK}")os.chdir(DIR_WORK) # 改变当前工作目录## 初始化记录文件if D_DEBUG['包基本信息IPv4'].strip() != '' and D_DEBUG['包基本信息IPv4'] != 'P':with open(D_DEBUG['包基本信息IPv4'], 'w', encoding='utf-8') as f:f.write(时间戳_2_时间文本(time.time())+'\n')标题 = " ID TIME SrcIP DstIP SPort DPort LEN 校验码 校验值 校验结果 标志 QR 操作码 AA TC RD RA Z AD CD 响应码 问题 答案 授权 附加 L_查询域名/L_结果地址"f.write(f"{标题}\n")if D_DEBUG['包基本信息IPv6'].strip() != '' and D_DEBUG['包基本信息IPv6'] != 'P':with open(D_DEBUG['包基本信息IPv6'], 'w', encoding='utf-8') as f:f.write(时间戳_2_时间文本(time.time())+'\n')标题 = " ID TIME SrcIP DstIP SPort DPort LEN 校验码 校验值 校验结果 标志 QR 操作码 AA TC RD RA Z AD CD 响应码 问题 答案 授权 附加 L_查询域名/L_结果地址"f.write(f"{标题}\n")if D_DEBUG['解析用时统计'].strip() != '' and D_DEBUG['解析用时统计'] != 'P':with open(D_DEBUG['解析用时统计'], 'w', encoding='utf-8') as f:f.write(时间戳_2_时间文本(time.time())+'\n')if D_DEBUG['解析状态统计'].strip() != '' and D_DEBUG['解析状态统计'] != 'P':with open(D_DEBUG['解析状态统计'], 'w', encoding='utf-8') as f:f.write(时间戳_2_时间文本(time.time())+'\n')if D_DEBUG['查询域名统计_分计'].strip() != '' and D_DEBUG['查询域名统计_分计'] != 'P':with open(D_DEBUG['查询域名统计_分计'], 'w', encoding='utf-8') as f:f.write(时间戳_2_时间文本(time.time())+'\n')if D_DEBUG['查询域名统计_总计'].strip() != '' and D_DEBUG['查询域名统计_总计'] != 'P':with open(D_DEBUG['查询域名统计_总计'], 'w', encoding='utf-8') as f:f.write(时间戳_2_时间文本(time.time())+'\n')def 用时分布_合并(L):D_TIME = {'<1ms':0, '<5ms':0, '<10ms':0, '<50ms':0, '<100ms':0, '<500ms':0, '<1s':0, '<5s':0, '>5s':0}for i in L:if i < 0.001:D_TIME['<5s'] +=1D_TIME['<1s'] +=1D_TIME['<500ms'] +=1D_TIME['<100ms'] +=1D_TIME['<50ms'] +=1D_TIME['<10ms'] +=1D_TIME['<5ms'] +=1D_TIME['<1ms'] +=1elif i < 0.005:D_TIME['<5s'] +=1D_TIME['<1s'] +=1D_TIME['<500ms'] +=1D_TIME['<100ms'] +=1D_TIME['<50ms'] +=1D_TIME['<10ms'] +=1D_TIME['<5ms'] +=1elif i < 0.01:D_TIME['<5s'] +=1D_TIME['<1s'] +=1D_TIME['<500ms'] +=1D_TIME['<100ms'] +=1D_TIME['<50ms'] +=1D_TIME['<10ms'] +=1elif i < 0.05:D_TIME['<5s'] +=1D_TIME['<1s'] +=1D_TIME['<500ms'] +=1D_TIME['<100ms'] +=1D_TIME['<50ms'] +=1elif i < 0.1:D_TIME['<5s'] +=1D_TIME['<1s'] +=1D_TIME['<500ms'] +=1D_TIME['<100ms'] +=1elif i < 0.5:D_TIME['<5s'] +=1D_TIME['<1s'] +=1D_TIME['<500ms'] +=1elif i < 1:D_TIME['<5s'] +=1D_TIME['<1s'] +=1elif i < 5:D_TIME['<5s'] +=1else:D_TIME['>5s'] +=1return(D_TIME)def 用时分布_分割(L):D_TIME = {'<1ms':0, '1~5ms':0, '5~10ms':0, '10~50ms':0, '50~100ms':0, '100~500ms':0, '0.5~1s':0, '1~5s':0, '>5s':0}for i in L:if i < 0.001:D_TIME['<1ms'] +=1elif 0.001 <= i < 0.005:D_TIME['1~5ms'] +=1elif 0.005 <= i < 0.01:D_TIME['5~10ms'] +=1elif 0.01 < i < 0.05:D_TIME['10~50ms'] +=1elif 0.05 <= i < 0.1:D_TIME['50~100ms'] +=1elif 0.1 <= i < 0.5:D_TIME['100~500ms'] +=1elif 0.5 <= i < 1:D_TIME['0.5~1s'] +=1elif 1 <= i < 5:D_TIME['1~5s'] +=1else:D_TIME['>5s'] +=1return(D_TIME)def 统计分析_解析用时(D_UDP_DNS_用时):标题 = " 次数 最大(秒) 最小(秒) 平均(秒) <1ms 1~5ms 5~10ms 10~50ms 50~100ms 100~500ms 0.5~1s 1~5s >5s"TEXT = ''if len(D_UDP_DNS_用时) != 0:for KEY in D_UDP_DNS_用时:IPa,IPb = KEYL_UDP_DNS_TIME = D_UDP_DNS_用时[KEY]LEN_TIMEs = len(L_UDP_DNS_TIME)D_TIME = 用时分布_分割(L_UDP_DNS_TIME)if LEN_TIMEs != 0:TEXT += f"{LEN_TIMEs:5} {max(L_UDP_DNS_TIME):9.6f} {min(L_UDP_DNS_TIME):9.6f} {sum(L_UDP_DNS_TIME)/LEN_TIMEs:9.6f} {D_TIME['<1ms']:4} {D_TIME['1~5ms']:4} {D_TIME['5~10ms']:4} {D_TIME['10~50ms']:4} {D_TIME['50~100ms']:4} {D_TIME['100~500ms']:4} {D_TIME['0.5~1s']:4} {D_TIME['1~5s']:4} {D_TIME['>5s']:4} {IPa:15s} {IPb:15s}\n"else:TEXT += f"{LEN_TIMEs:5} {IPa:15s} {IPb:15s}\n"return(标题, TEXT)def 统计分析_解析状态(D_UDP_DNS_响应码):标题 = " 次数 正常响应次数/占比 域名错误次数/占比 拒绝响应次数/占比 服务器失败数/占比 其他状态次数/占比"TEXT = ''if len(D_UDP_DNS_响应码) != 0:for KEY in D_UDP_DNS_响应码:IPa,IPb = KEYL_UDP_DNS_响应码 = D_UDP_DNS_响应码[KEY]LEN = len(L_UDP_DNS_响应码)if LEN != 0:正常响应次数=L_UDP_DNS_响应码.count(0)服务器失败数=L_UDP_DNS_响应码.count(2)域名错误次数=L_UDP_DNS_响应码.count(3)拒绝响应次数=L_UDP_DNS_响应码.count(5)其他状态次数=LEN-正常响应次数-服务器失败数-域名错误次数-拒绝响应次数TEXT += f"{LEN:5} {正常响应次数:4}/{(正常响应次数/LEN)*100:>3.0f}% {域名错误次数:4}/{(域名错误次数/LEN)*100:>3.0f}% {拒绝响应次数:4}/{(拒绝响应次数/LEN)*100:>3.0f}% {服务器失败数:4}/{(服务器失败数/LEN)*100:>3.0f}% {其他状态次数:4}/{(其他状态次数/LEN)*100:>3.0f}% {IPa:15s} {IPb:15s}\n"else:TEXT += f"{LEN:5} {IPa:15s} {IPb:15s}\n"return(标题, TEXT)def 统计分析_查询域名_分计(D_UDP_DNS_查域名):标题 = "分计\n 次数 域名"TEXT = ''if len(D_UDP_DNS_查域名) != 0:for KEY in D_UDP_DNS_查域名:IPa,IPb = KEYL_NAME = [(D_UDP_DNS_查域名[KEY][k], k) for k in D_UDP_DNS_查域名[KEY]]L_NAME.sort()for 次数,域名 in L_NAME:TEXT += f"{次数:5} {域名} \t\t {IPa:15s} {IPb:15s}\n"return(标题, TEXT)def 统计分析_查询域名_总计(D_查询域名_ALL):标题 = "合计\n 次数 域名"TEXT = ''if len(D_查询域名_ALL) != 0:L_NAME = [(D_查询域名_ALL[k], k) for k in D_查询域名_ALL]L_NAME.sort()for 次数,域名 in L_NAME:TEXT += f"{次数:5} {域名}\n"return(标题, TEXT)def 统计分析(D_UDP_DNS, D_DEBUG, 备注):print(f"\n统计分析 {备注}")D_查询域名_ALL = {}D_UDP_DNS_查域名 = {}D_UDP_DNS_用时 = {}D_UDP_DNS_响应码 = {} # 成功失败统计 {(SrcIP,DstIP):[响应码,], }for KEY in D_UDP_DNS:SrcIP,SrcPort,DstIP,DstPort = KEY#打印_黄(f"KEY={KEY}")D_UDP_DNS_TIME = {}for 包编号,时间戳,包长度,UDP_校验结果, 标识, QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码, 问题记录数,答案记录数,授权信息记录数,附加信息记录数, L_问题记录,L_结果地址,L_授权信息记录,L_附加信息记录 in D_UDP_DNS[KEY]:if 标识 not in D_UDP_DNS_TIME:D_UDP_DNS_TIME[标识] = {'查询':0, '应答':0}if QR == 0: # 查询## 统计所有客户端查询不同的域名次数for 问题记录 in L_问题记录:if 问题记录 in D_查询域名_ALL:D_查询域名_ALL[问题记录] +=1else:D_查询域名_ALL[问题记录] =1## 统计每个客户端查询不同的域名次数if (SrcIP,DstIP) in D_UDP_DNS_查域名:for 问题记录 in L_问题记录:if 问题记录 in D_UDP_DNS_查域名[(SrcIP,DstIP)]:D_UDP_DNS_查域名[(SrcIP,DstIP)][问题记录] +=1else:D_UDP_DNS_查域名[(SrcIP,DstIP)][问题记录] =1elif (DstIP,SrcIP) in D_UDP_DNS_查域名:for 问题记录 in L_问题记录:if 问题记录 in D_UDP_DNS_查域名[(DstIP,SrcIP)]:D_UDP_DNS_查域名[(DstIP,SrcIP)][问题记录] +=1else:D_UDP_DNS_查域名[(DstIP,SrcIP)][问题记录] =1else:D_UDP_DNS_查域名[(SrcIP,DstIP)] = {}for 问题记录 in L_问题记录:if 问题记录 in D_UDP_DNS_查域名[(SrcIP,DstIP)]:D_UDP_DNS_查域名[(SrcIP,DstIP)][问题记录] +=1else:D_UDP_DNS_查域名[(SrcIP,DstIP)][问题记录] =1## 记录查询请求时间信息if D_UDP_DNS_TIME[标识]['查询'] == 0:D_UDP_DNS_TIME[标识]['查询'] = 时间戳else:if D_DEBUG['重复查询'] == 'P':打印_红(f"{标识:5} 重复查询?{KEY}\t 包编号={包编号:5} {时间戳_2_时间文本(时间戳)}")else:## 统计应答正常或异常if (SrcIP,DstIP) in D_UDP_DNS_响应码:D_UDP_DNS_响应码[(SrcIP,DstIP)].append(响应码)elif (DstIP,SrcIP) in D_UDP_DNS_响应码:D_UDP_DNS_响应码[(SrcIP,DstIP)].append(响应码)else:D_UDP_DNS_响应码[(SrcIP,DstIP)] = [响应码]## 计算解析用时if D_UDP_DNS_TIME[标识]['应答'] == 0:D_UDP_DNS_TIME[标识]['应答'] = 时间戳else:if D_DEBUG['重复应答'] == 'P':打印_红(f"{标识:5} 重复应答?{KEY}\t 包编号={包编号:5} {时间戳_2_时间文本(时间戳)}")if D_UDP_DNS_TIME[标识]['查询'] != 0:用时 = 时间戳-D_UDP_DNS_TIME[标识]['查询']if (SrcIP,DstIP) in D_UDP_DNS_用时:D_UDP_DNS_用时[(SrcIP,DstIP)].append(用时)elif (DstIP,SrcIP) in D_UDP_DNS_用时:D_UDP_DNS_用时[(DstIP,SrcIP)].append(用时)else:D_UDP_DNS_用时[(SrcIP,DstIP)] = [用时]#打印_灰(f" D_UDP_DNS_TIME={D_UDP_DNS_TIME}")if D_DEBUG['解析用时统计'] == 'P':标题, TEXT = 统计分析_解析用时(D_UDP_DNS_用时)打印_青(f"\n解析用时{'-'*101}")打印_黄(标题)print(TEXT)elif D_DEBUG['解析用时统计'].strip() != '':标题, TEXT = 统计分析_解析用时(D_UDP_DNS_用时)with open(D_DEBUG['解析用时统计'], 'a', encoding='utf-8') as f:f.write(标题+'\n')f.write(TEXT)print(f"解析用时统计 保存到 {os.getcwd()} 目录下 {D_DEBUG['解析用时统计']} 文件中")if D_DEBUG['解析状态统计'] == 'P':标题, TEXT = 统计分析_解析状态(D_UDP_DNS_响应码)打印_青(f"\n解析状态{'-'*92}")打印_黄(标题)print(TEXT)elif D_DEBUG['解析状态统计'].strip() != '':标题, TEXT = 统计分析_解析状态(D_UDP_DNS_响应码)with open(D_DEBUG['解析状态统计'], 'a', encoding='utf-8') as f:f.write(标题+'\n')f.write(TEXT)print(f"解析状态统计 保存到 {os.getcwd()} 目录下 {D_DEBUG['解析状态统计']} 文件中")if D_DEBUG['查询域名统计_分计'] == 'P':标题, TEXT = 统计分析_查询域名_分计(D_UDP_DNS_查域名)打印_青(f"\n查询域名统计_分计")打印_黄(标题)print(TEXT)elif D_DEBUG['查询域名统计_分计'].strip() != '':标题, TEXT = 统计分析_查询域名_分计(D_UDP_DNS_查域名)with open(D_DEBUG['查询域名统计_分计'], 'a', encoding='utf-8') as f:f.write(标题+'\n')f.write(TEXT)print(f"查询域名统计_分计 保存到 {os.getcwd()} 目录下 {D_DEBUG['查询域名统计_分计']} 文件中")if D_DEBUG['查询域名统计_总计'] == 'P':标题, TEXT = 统计分析_查询域名_总计(D_查询域名_ALL)打印_青(f"\n查询域名统计_总计")打印_黄(标题)print(TEXT)elif D_DEBUG['查询域名统计_总计'].strip() != '':标题, TEXT = 统计分析_查询域名_总计(D_查询域名_ALL)with open(D_DEBUG['查询域名统计_总计'], 'a', encoding='utf-8') as f:f.write(标题+'\n')f.write(TEXT)print(f"查询域名统计_总计 保存到 {os.getcwd()} 目录下 {D_DEBUG['查询域名统计_总计']} 文件中")def 查看指定DNS标识数据包(D_UDP_DNS, P_指定DNS标志, 备注):打印_黄(f"查看指定DNS标识数据包 {备注}")打印_紫(" ID TIME SrcIP DstIP SPort DPort LEN 校验结果 标志 QR 操作码 权威解析 截断 C递归 S查询 保留 S数签 S数签 响应码 问题 答案 授权 附加 查询/结果")for KEY in D_UDP_DNS:SrcIP,SrcPort,DstIP,DstPort = KEYfor 包编号,时间戳,包长度,UDP_校验结果, 标识, QR,操作码,AA,TC,RD,RA,Z,AD,CD,响应码, 问题记录数,答案记录数,授权信息记录数,附加信息记录数, L_问题记录,L_结果地址,L_授权信息记录,L_附加信息记录 in D_UDP_DNS[KEY]:if 标识 in P_指定DNS标志:TEXT_DNS_INFO = f"{标识:5} {D_DNS_FLAGS_简['QR'][QR]} {D_DNS_FLAGS_简['操作码'][操作码]} {D_DNS_FLAGS_简['AA'][AA]} {D_DNS_FLAGS_简['TC'][TC]} {D_DNS_FLAGS_简['RD'][RD]} {D_DNS_FLAGS_简['RA'][RA]} {D_DNS_FLAGS_简['Z'][Z]} {D_DNS_FLAGS_简['AD'][AD]} {D_DNS_FLAGS_简['CD'][CD]} {D_DNS_FLAGS_简['响应码'][响应码]} {问题记录数:4} {答案记录数:4} {授权信息记录数:4} {附加信息记录数:4}"if QR == 0:TEXT = f"{包编号:6} {时间戳:17.6f} {SrcIP:15s} {DstIP:15s} {SrcPort:5} {DstPort:5} {包长度:4} {UDP_校验结果} {TEXT_DNS_INFO} {L_问题记录}"if UDP_校验结果 == '校验成功':打印_绿(TEXT)if L_问题记录 != []:打印_灰(f"{包编号:6} 问题记录 {L_问题记录}")if L_结果地址 != []:打印_灰(f"{包编号:6} 结果地址 {L_结果地址}")if L_授权信息记录 != []:打印_灰(f"{包编号:6} 授权信息记录 {L_授权信息记录}")if L_附加信息记录 != []:打印_灰(f"{包编号:6} 附加信息记录 {L_附加信息记录}")else:打印_灰底绿字(TEXT)if L_问题记录 != []:打印_灰(f"{包编号:6} 问题记录 {L_问题记录}")if L_结果地址 != []:打印_灰(f"{包编号:6} 结果地址 {L_结果地址}")if L_授权信息记录 != []:打印_灰(f"{包编号:6} 授权信息记录 {L_授权信息记录}")if L_附加信息记录 != []:打印_灰(f"{包编号:6} 附加信息记录 {L_附加信息记录}")else:TEXT = f"{包编号:6} {时间戳:17.6f} {DstIP:15s} {SrcIP:15s} {DstPort:5} {SrcPort:5} {包长度:4} {UDP_校验结果} {TEXT_DNS_INFO} {L_结果地址}"if UDP_校验结果 == '校验成功':打印_青(TEXT)if L_问题记录 != []:打印_灰(f"{包编号:6} 问题记录 {L_问题记录}")if L_结果地址 != []:打印_灰(f"{包编号:6} 结果地址 {L_结果地址}")if L_授权信息记录 != []:打印_灰(f"{包编号:6} 授权信息记录 {L_授权信息记录}")if L_附加信息记录 != []:打印_灰(f"{包编号:6} 附加信息记录 {L_附加信息记录}")else:打印_灰底青字(TEXT)if L_问题记录 != []:打印_灰(f"{包编号:6} 问题记录 {L_问题记录}")if L_结果地址 != []:打印_灰(f"{包编号:6} 结果地址 {L_结果地址}")if L_授权信息记录 != []:打印_灰(f"{包编号:6} 授权信息记录 {L_授权信息记录}")if L_附加信息记录 != []:打印_灰(f"{包编号:6} 附加信息记录 {L_附加信息记录}")def RUN(L_PATH_PCAP_FILE, D_DNS_FLAGS, D_DEBUG, P_SELECT_PORT, P_SELECT_IP, P_解析指定包编号, P_指定DNS标志, P_DNS_PORT, D_FrameType, D_IP_Protocol):打印_黄(" ID TIME SrcIP DstIP SPort DPort LEN 校验码 校验值 校验结果 标志 QR 操作码 AA TC RD RA Z AD CD 响应码 问题 答案 授权 附加 L_查询域名/L_结果地址")D_PCAP,D_IPv4_UDP_DNS,D_IPv6_UDP_DNS = 解析PCAP文件中的UDP(L_PATH_PCAP_FILE, P_解析指定包编号, D_DNS_FLAGS, D_DEBUG, P_SELECT_PORT, P_SELECT_IP, P_DNS_PORT)#打印_灰(D_PCAP)#打印_灰(D_IPv4_UDP_DNS)#打印_灰(D_IPv6_UDP_DNS)if P_指定DNS标志 != set():查看指定DNS标识数据包(D_IPv4_UDP_DNS, P_指定DNS标志, 'IPv4')查看指定DNS标识数据包(D_IPv6_UDP_DNS, P_指定DNS标志, 'IPv6')## 统计分析统计分析(D_IPv4_UDP_DNS, D_DEBUG, 'IPv4')统计分析(D_IPv6_UDP_DNS, D_DEBUG, 'IPv6')打印_黄("\n抓包数据统计")打印_黄(f" VLAN IEEE_802.3 IPv4 IPv6 TCP UDP")打印_绿(f"{D_PCAP['VLAN']:5} {D_PCAP['IEEE 802.3']:10} {D_PCAP['IPv4']:5} {D_PCAP['IPv6']:5} {D_PCAP['TCP']:5} {D_PCAP['UDP']:5}")打印_黄(f"其他 FrameType")#print(f"OTHER_FrameType={D_PCAP['OTHER_FrameType']}")for K in D_PCAP['OTHER_FrameType']:if K in D_FrameType:print(f"{D_FrameType[K]:11s} {D_PCAP['OTHER_FrameType'][K]}")else:print(f"{K} {D_PCAP['OTHER_FrameType'][K]}")打印_黄(f"其他 IP_Protocol")#print(f"OTHER_IP_Protocol={D_PCAP['OTHER_IP_Protocol']}")for K in D_PCAP['OTHER_IP_Protocol']:if K in D_IP_Protocol:print(f"{D_IP_Protocol[K]:11s} {D_PCAP['OTHER_IP_Protocol'][K]}")else:print(f"{K} {D_PCAP['OTHER_IP_Protocol'][K]}")打印_黄(f"D_PCAP['忽略_包编号_包数量']={D_PCAP['忽略_包编号_包数量']}")打印_黄(f"D_PCAP['忽略_地址_包数量']={D_PCAP['忽略_地址_包数量']}")打印_黄(f"D_PCAP['忽略_端口_包数量']={D_PCAP['忽略_端口_包数量']}")打印_红(f"D_PCAP['UDP_DNS_IPv4_ERROR'] 数量={len(D_PCAP['UDP_DNS_IPv4_ERROR'])}")打印_红(f"D_PCAP['UDP_DNS_IPv4_ERROR']={D_PCAP['UDP_DNS_IPv4_ERROR']}")打印_红(f"D_PCAP['UDP_DNS_IPv6_ERROR'] 数量={len(D_PCAP['UDP_DNS_IPv6_ERROR'])}")打印_红(f"D_PCAP['UDP_DNS_IPv6_ERROR']={D_PCAP['UDP_DNS_IPv6_ERROR']}")打印_绿(f"D_PCAP['UDP校验成功']={D_PCAP['UDP校验成功']}")打印_红(f"D_PCAP['UDP校验失败']={D_PCAP['UDP校验失败']}")if __name__ == '__main__':## DNS 标志含义D_DNS_FLAGS = {}D_DNS_FLAGS['QR'] = {0:'查询', 1:'响应'}D_DNS_FLAGS['操作码'] = {0:'正向查询',1:'反向查询',2:'状态查询',3:'无',4:'通知',5:'更新',6:'保留',7:'保留',8:'保留',9:'保留',10:'保留',11:'保留',12:'保留',13:'保留',14:'保留',15:'保留'}D_DNS_FLAGS['AA'] = {0:'权威解析(否)', 1:'权威解析(是)'}D_DNS_FLAGS['TC'] = {0:'截断(否)', 1:'截断(是)'}D_DNS_FLAGS['RD'] = {0:'C期望递归(否)', 1:'C期望递归(是)'}D_DNS_FLAGS['RA'] = {0:'S支持递归(否)', 1:'S支持递归(是)'}D_DNS_FLAGS['Z'] = {0:'保留(正常)', 1:'保留(异常)'}D_DNS_FLAGS['AD'] = {0:'S数字签名(未验证)', 1:'S数字签名(已验证)'}D_DNS_FLAGS['CD'] = {0:'S数字签名(已验证)', 1:'S数字签名(未验证)'}D_DNS_FLAGS['响应码'] = {0:'正常',1:'报文格式错误',2:'服务器失败',3:'名字差错',4:'没有实现',5:'拒绝',6:'不该出现域名',7:'集合 RR 存在但是他不该存在',8:'集合 RR 不存在但是他应该存在',9:'服务器并不是这个区域的权威服务器',10:'该名称并不包含在区域中',11:'保留',12:'保留',13:'保留',14:'保留',15:'保留',16:'错误的 OPT 版本或者 TSIG 签名无效',17:'无法识别的密钥',18:'签名不在时间范围内',19:'错误的 TKEY 模式',20:'重复的密钥名称',21:'该算法不支持',22:'错误的截断'}D_DNS_FLAGS_简 = {}D_DNS_FLAGS_简['QR'] = {0:'查询', 1:'响应'}D_DNS_FLAGS_简['操作码'] = {0:'正向查询',1:'反向查询',2:'状态查询',3:'无',4:'通知',5:'更新',6:'保留',7:'保留',8:'保留',9:'保留',10:'保留',11:'保留',12:'保留',13:'保留',14:'保留',15:'保留'}D_DNS_FLAGS_简['AA'] = {0:'否', 1:'是'}D_DNS_FLAGS_简['TC'] = {0:'否', 1:'是'}D_DNS_FLAGS_简['RD'] = {0:'否', 1:'是'}D_DNS_FLAGS_简['RA'] = {0:'否', 1:'是'}D_DNS_FLAGS_简['Z'] = {0:'正常', 1:'异常'}D_DNS_FLAGS_简['AD'] = {0:'未验', 1:'已验'}D_DNS_FLAGS_简['CD'] = {0:'已验', 1:'未验'}D_DNS_FLAGS_简['响应码'] = {0:'正常',1:'报文格式错误',2:'S失败',3:'名字差错',4:'没有实现',5:'拒绝',6:'不该出现域名',7:'集合 RR 存在但是他不该存在',8:'集合 RR 不存在但是他应该存在',9:'服务器并不是这个区域的权威服务器',10:'该名称并不包含在区域中',11:'保留',12:'保留',13:'保留',14:'保留',15:'保留',16:'错误的 OPT 版本或者 TSIG 签名无效',17:'无法识别的密钥',18:'签名不在时间范围内',19:'错误的 TKEY 模式',20:'重复的密钥名称',21:'该算法不支持',22:'错误的截断'}## 常见以太帧类型D_FrameType = {b'\x08\x06':'ARP', b'\x88\xcc':'LLDP', b'\x08\x00':'IPv4', b'\x81\x00':'VLAN', b'\x86\xdd':'IPv6'}## 常见IP协议D_IP_Protocol = {1:'ICMP', 2:'IGMP', 6:'TCP', 17:'UDP'}## 调试:打印或记录到文件D_DEBUG = {}D_DEBUG['DEBUG'] = 'P' # 总开关 'P':打印,'':空忽略D_DEBUG['包基本信息IPv4'] = 'P' # 'P':打印,'':空忽略,'非空值':作为文件名保存记录到文件(如'包基本信息IPv4.txt')D_DEBUG['包基本信息IPv6'] = 'P' # 'P':打印,'':空忽略,'非空值':作为文件名保存记录到文件(如'包基本信息IPv4.txt')【注】IPv6 地址显示太长,只截取前15字符显示D_DEBUG['问题记录'] = ''D_DEBUG['结果地址'] = ''D_DEBUG['授权信息记录'] = ''D_DEBUG['附加信息记录'] = ''D_DEBUG['解析失败详情'] = ''D_DEBUG['解析用时统计'] = 'P' # 'P':打印,'':空忽略,'非空值':作为文件名保存记录到文件(如'解析用时统计.txt')D_DEBUG['解析状态统计'] = 'P' # 'P':打印,'':空忽略,'非空值':作为文件名保存记录到文件(如'解析状态统计.txt')D_DEBUG['查询域名统计_分计'] = 'P' # 'P':打印,'':空忽略,'非空值':作为文件名保存记录到文件(如'查询域名统计_分计.txt')D_DEBUG['查询域名统计_总计'] = 'P' # 'P':打印,'':空忽略,'非空值':作为文件名保存记录到文件(如'查询域名统计_总计.txt')D_DEBUG['重复查询'] = '' # 丢包或无应答会再次发查询请求包D_DEBUG['重复应答'] = ''## 指定保存记录的目录路径,路径不存在则创建目录,为空或创建失败则使用脚本当前目录DIR_WORK = 'D:\\TEST\\PCAP\\UDP'## 指定pcap文件路径或文件夹路径PCAP_File_OR_Dir = 'E:\\TEST\\PCAP\\test.pcap' # 具体文件路径#PCAP_File_OR_Dir = 'E:\\TEST\\PCAP\\DIR' # 存放PCAP文件的文件夹路径## 筛选解析指定数据包P_解析指定包编号 = set() # 全部处理#P_解析指定包编号 = set([i for i in range(100)]) # 只处理前99个包#P_解析指定包编号 = set([1,3,5]) # 只处理指定包## 筛选指定IP或PORT的数据SELECT_PORT = '' # ''空字符:不刷选,填入具体端口号进行筛选,多个端口号以空格分隔(如 '60001' 或 '12345 54321')SELECT_IP = '' # ''空字符:不刷选,填入具体IP进行筛选,多个IP以空格分隔(如 '192.168.0.2' 或 '192.168.1.100 192.168.200.1')P_SELECT_PORT = set([int(i) for i in SELECT_PORT.split()])P_SELECT_IP = set([i for i in SELECT_IP.split()])## 显示指定查询标识数据包P_指定DNS标志 = set()#P_指定DNS标志 = set([0])## DNS包端口,遇到这些端口就按DNS数据解析## 53/UDP (DNS)(Domain Name System)域名解析系统## 137/UDP (NBNS)(NetBIOS Name System)网络基本输入/输出系统(NetBIOS)名称服务器 ## 5353/UDP (mDNS)(Multicast DNS)组播DNS## 5355/UDP (LLMNR)(Link-Local Multicast Name Resolution)链路本地多播名称解析P_DNS_PORT = set([53, 137, 5353, 5355]) # 默认值## 运行初始化目录(DIR_WORK, D_DEBUG)指定扩展名 = 'pcap'L_PATH_PCAP_FILE = 生成处理文件列表(PCAP_File_OR_Dir, 指定扩展名)for i in L_PATH_PCAP_FILE:打印_青(i)X = input(f"以上是处理pacp文件顺序,数量={len(L_PATH_PCAP_FILE)},输入'y'或'Y'继续: ")if X in ('y', 'Y'):RUN(L_PATH_PCAP_FILE, D_DNS_FLAGS, D_DEBUG, P_SELECT_PORT, P_SELECT_IP, P_解析指定包编号, P_指定DNS标志, P_DNS_PORT, D_FrameType, D_IP_Protocol)'''
参考
Ethertype
(十六进制) 协议
0x0000 - 0x05DC IEEE 802.3 长度
0x0101 – 0x01FF 实验
0x0600 XEROX NS IDP
0x0660 DLOG
0x0661 DLOG
0x0800 网际协议(IP)
0x0801 X.75 Internet
0x0802 NBS Internet
0x0803 ECMA Internet
0x0804 Chaosnet
0x0805 X.25 Level 3
0x0806 地址解析协议(ARP : Address Resolution Protocol)
0x0808 帧中继 ARP (Frame Relay ARP) [RFC1701]
0x6559 原始帧中继(Raw Frame Relay) [RFC1701]
0x8035 动态 DARP (DRARP:Dynamic RARP)反向地址解析协议(RARP:Reverse Address Resolution Protocol)
0x8037 Novell Netware IPX
0x809B EtherTalk
0x80D5 IBM SNA Services over Ethernet
0x80F3 AppleTalk 地址解析协议(AARP:AppleTalk Address Resolution Protocol)
0x8100 VLAN
0x8137 因特网包交换(IPX:Internet Packet Exchange)
0x814C 简单网络管理协议(SNMP:Simple Network Management Protocol)
0x86DD 网际协议v6 (IPv6,Internet Protocol version 6)
0x880B 点对点协议(PPP:Point-to-Point Protocol)
0x880C 通用交换管理协议(GSMP:General Switch Management Protocol)
0x8847 多协议标签交换(单播) MPLS:Multi-Protocol Label Switching <unicast>)
0x8848 多协议标签交换(组播)(MPLS, Multi-Protocol Label Switching <multicast>)
0x8863 以太网上的 PPP(发现阶段)(PPPoE:PPP Over Ethernet <Discovery Stage>)
0x8864 以太网上的 PPP(PPP 会话阶段) (PPPoE,PPP Over Ethernet<PPP Session Stage>)
0x88BB 轻量级访问点协议(LWAPP:Light Weight Access Point Protocol)
0x88CC 链接层发现协议(LLDP:Link Layer Discovery Protocol)
0x8E88 局域网上的 EAP(EAPOL:EAP over LAN)
0x9000 配置测试协议(Loopback)
0x9100 VLAN 标签协议标识符(VLAN Tag Protocol Identifier)
0x9200 VLAN 标签协议标识符(VLAN Tag Protocol Identifier)
0xFFFF 保留
'''