系列文章目录
sigrokdecode 模块学习指南 — 准备阶段
通讯协议 - Uart
sigrokdecode 模块
UART协议解码器源码解析
Uart协议解码器源码各个方法
`
文章目录
- 系列文章目录
- 引入库
- parity_ok
- 注解类型枚举
- options参数
- annotations 注解
- annotation_rows 注解分组
- 接收(RX)相关行
- 发送(TX)相关行
- put 输出方法
- putx(self, rxtx, data)
- putx_packet(self, rxtx, data)
- putpx(self, rxtx, data)
- putg(self, data)
- putp(self, data)
- putgse(self, ss, es, data)
- 工作流程示例
- reset
- 示例场景
- start
- Python对象通道
- 定义与用途
- 数据形式
- 逻辑分析仪注释通道
- **定义与用途**
- **数据形式**
- 示例场景
- wait_for_start_bit
- get_start_bit
- 记录起始位信号
- 验证起始位有效性
- handle_packet
- 确定配置参数
- 缓存数据并记录起始样本点
- 判断数据包结束条件
- get_data_bits
- format_value
- get_parity_bit
- 保存校验位值并更新位计数器
- 校验位验证
- 状态转移
- get_stop_bits
- 记录停止位值并更新位计数器
- 验证停止位合法性
- 输出停止位信息
- 检查停止位数量
- get_wait_cond
- 取当前状态
- 处理「等待起始位」状态
- 处理「数据/校验/停止位」状态
- get_idle_cond
- inspect_sample
- 信号反转处理
- 状态路由与解析
- 执行流程示例
- inspect_edge
- decode
- 初始校验
- 信号反转配置
- 帧参数计算
- 主事件循环
- 步骤分解
'''
输出格式说明(OUTPUT_PYTHON):
每个数据包格式为 [<ptype>, <rxtx>, <pdata>]
<ptype> 类型包括:- 'STARTBIT': 起始位值(0/1)- 'DATA': 数据值和各比特位的起止样本号- 'PARITYBIT': 校验位值- 'STOPBIT': 停止位值- 'INVALID ...': 无效位错误- 'PARITY ERROR': 校验错误- 'BREAK': 线路中断- 'FRAME': 完整帧数据及有效性- 'IDLE': 空闲状态
<rxtx> 表示方向:0(RX接收)或1(TX发送)
'''
''' DATA = (13, [(1, ss=200, es=250), (0, ss=150, es=200), (1, ss=100, es=150), (1, ss=50, es=100)])
LSB到MSB顺序:传输顺序为 1 (LSB) → 0 → 1 → 1 (MSB)。'''
引入库
# 引入必要的库
import sigrokdecode as srd # Sigrok 解码器基础库
from common.srdhelper import bitpack # 辅助函数:将位列表转为整数值
from math import floor, ceil # 数学函数
parity_ok
# 校验位检查函数
def parity_ok(parity_type, parity_bit, data, data_bits):# 忽略校验:直接返回Trueif parity_type == 'ignore': return True# 根据校验类型判断校验位是否正确if parity_type == 'zero': return parity_bit == 0elif parity_type == 'one': return parity_bit == 1# 计算1的个数(数据+校验位)ones = bin(data).count('1') + parity_bit # 奇校验:1的总数为奇数if parity_type == 'odd': return (ones % 2) == 1elif parity_type == 'even': return (ones % 2) == 0
参数 | 含义 | 取值范围 | 作用 |
---|---|---|---|
parity_type | 校验类型 | ignore /zero /one /odd /even | 决定校验规则 |
parity_bit | 接收到的校验位值 | 0 或 1 | 需根据校验类型验证合法性 |
data | 数据整数值 | 由data_bits 决定(如0~255) | 提供原始数据用于计算1的数量 |
data_bits | 数据位长度 | 5 ~9 | 确保正确解析数据的二进制位数 |
- 函数用于验证 UART 协议中的奇偶校验位是否正确。
- 忽略校验:直接返回
True
。 - 固定校验值:强制校验位为
0
或1
。 - 奇偶校验:统计数据位和校验位中
1
的总数
注解类型枚举
# 自定义异常类
class SamplerateError(Exception): pass # 采样率未设置错误
class ChannelError(Exception): pass # 通道未配置错误# 注解类型枚举(用于在GUI中高亮显示UART帧的不同部分(如起始位、数据位)。)
class Ann:RX_DATA, TX_DATA, RX_START, TX_START, RX_PARITY_OK, TX_PARITY_OK, \RX_PARITY_ERR, TX_PARITY_ERR, RX_STOP, TX_STOP, RX_WARN, TX_WARN, \RX_DATA_BIT, TX_DATA_BIT, RX_BREAK, TX_BREAK, RX_PACKET, TX_PACKET = range(18)# 二进制输出类型 原始数据的转储,便于后续分析
class Bin:RX, TX, RXTX = range(3) # 分别对应接收、发送、双向数据
range(3)
:生成一个整数序列[0, 1, 2]
。
RX_DATA | 0 | 接收端完整数据字节 | 标注接收到的完整数据字节(如 0x55 ) |
---|---|---|---|
TX_DATA | 1 | 发送端完整数据字节 | 标注发送的完整数据字节 |
RX_START | 2 | 接收起始位(Start Bit) | 标注接收通道的起始位(低电平信号) |
TX_START | 3 | 发送起始位 | 标注发送通道的起始位 |
RX_PARITY_OK | 4 | 接收校验正确 | 校验位验证通过时的标注(如偶校验正确) |
TX_PARITY_OK | 5 | 发送校验正确 | 发送端校验正确 |
RX_PARITY_ERR | 6 | 接收校验错误 | 校验位验证失败时的错误标注 |
TX_PARITY_ERR | 7 | 发送校验错误 | 发送端校验错误 |
RX_STOP | 8 | 接收停止位(Stop Bit) | 标注接收通道的停止位(高电平信号) |
TX_STOP | 9 | 发送停止位 | 标注发送通道的停止位 |
RX_WARN | 10 | 接收警告 | 接收异常(如停止位电平错误) |
TX_WARN | 11 | 发送警告 | 发送端异常 |
RX_DATA_BIT | 12 | 接收的单个数据位 | 标注接收的每个二进制位(如 0 或 1 ) |
TX_DATA_BIT | 13 | 发送的单个数据位 | 标注发送的每个二进制位 |
RX_BREAK | 14 | 接收线路中断(BREAK 状态) | 检测到线路长时间低电平(BREAK 信号) |
TX_BREAK | 15 | 发送线路中断 | 发送端 BREAK 信号 |
RX_PACKET | 16 | 接收完整数据包 | 标注多字节组成的完整数据包(如 [0xAA, 0x55] ) |
TX_PACKET | 17 | 发送完整数据包 | 标注发送的完整数据包 |
options参数
# 用户可配置参数options = ({'id': 'baudrate', 'desc': '波特率', 'default': 115200},{'id': 'data_bits', 'desc': '数据位', 'default': 8, 'values': (5, 6, 7, 8, 9)},{'id': 'parity', 'desc': '校验类型', 'default': 'none', 'values': ('none', 'odd', 'even', 'zero', 'one', 'ignore')},{'id': 'stop_bits', 'desc': '停止位', 'default': 1.0, 'values': (0.0, 0.5, 1.0, 1.5, 2.0)},{'id': 'bit_order', 'desc': '位顺序', 'default': 'lsb-first', 'values': ('lsb-first', 'msb-first')},{'id': 'format', 'desc': '数据格式', 'default': 'hex', 'values': ('ascii', 'dec', 'hex', 'oct', 'bin')},# ... 其他选项(省略) ...)
参数ID (id ) | 描述 (desc ) | 默认值 (default ) | 可选值 (values ) | 说明 |
---|---|---|---|---|
baudrate | 波特率 | 115200 | - | 每秒传输的符号数(常用值如9600、115200)。 |
data_bits | 数据位 | 8 | (5, 6, 7, 8, 9) | 每个数据帧的位数(通常为8位)。 |
parity | 校验类型 | ‘none’ | (‘none’, ‘odd’, ‘even’, ‘zero’, ‘one’, ‘ignore’) | 校验位规则(见奇偶校验详解)。 |
stop_bits | 停止位 | 1.0 | (0.0, 0.5, 1.0, 1.5, 2.0) | 停止位的持续时间(通常为1或2位时间)。 |
bit_order | 位传输顺序 | ‘lsb-first’ | (‘lsb-first’, ‘msb-first’) | LSB(最低有效位)先传输是UART标准。 |
format | 数据格式 | ‘hex’ | (‘ascii’, ‘dec’, ‘hex’, ‘oct’, ‘bin’) | 数据以ASCII、十进制、十六进制、八进制或二进制格式显示。 |
invert_rx | 反转RX信号电平 | ‘no’ | (‘yes’, ‘no’) | 若为’yes’,接收信号电平逻辑取反(高→低,低→高)。 |
invert_tx | 反转TX信号电平 | ‘no’ | (‘yes’, ‘no’) | 若为’yes’,发送信号电平逻辑取反。 |
sample_point | 采样点位置(百分比) | 50 | - | 在位的中间位置(50%)采样以提高抗噪性。 |
rx_packet_delim | RX数据包分隔符(十进制) | -1 | - | 分隔符的ASCII值(如-1 表示无分隔符,10 表示换行符)。 |
tx_packet_delim | TX数据包分隔符(十进制) | -1 | - | 同RX分隔符,用于发送。 |
rx_packet_len | RX数据包长度 | -1 | - | 固定长度数据包(如-1 表示可变长度)。 |
tx_packet_len | TX数据包长度 | -1 | - | 同RX数据包长度。 |
annotations 注解
# 注解定义(界面显示的文本标签)annotations = (('rx-data', '接收数据'), ('tx-data', '发送数据'),('rx-start', '接收起始位'), ('tx-start', '发送起始位'),# ... 其他注解(省略) ...)
标识符 (ID) | 描述 (Description) | 含义与应用场景 |
---|---|---|
rx-data | RX data | 接收数据:标识接收到的数据位部分,通常对应数据帧的有效信息。 |
tx-data | TX data | 发送数据:标识发送的数据位部分,标记正在传输的有效信息。 |
rx-start | RX start bit | 接收起始位:标识接收到的起始位(低电平),用于同步数据接收。 |
tx-start | TX start bit | 发送起始位:标识发送的起始位(低电平),标志数据帧开始传输。 |
rx-parity-ok | RX parity OK bit | 接收校验成功:校验位验证通过(奇偶校验正确),数据无错误。 |
tx-parity-ok | TX parity OK bit | 发送校验成功:发送端计算的校验位符合规则,数据已正确添加校验。 |
rx-parity-err | RX parity error bit | 接收校验失败:校验位验证失败,数据可能存在传输错误。 |
tx-parity-err | TX parity error bit | 发送校验失败:发送端校验计算异常(通常为配置错误或逻辑问题)。 |
rx-stop | RX stop bit | 接收停止位:标识接收到的停止位(高电平),标志数据帧结束。 |
tx-stop | TX stop bit | 发送停止位:标识发送的停止位(高电平),确保数据帧结束。 |
rx-warning | RX warning | 接收警告:指示接收过程中的非致命异常(如波特率轻微偏差、噪声干扰)。 |
tx-warning | TX warning | 发送警告:指示发送过程中的潜在问题(如缓冲区溢出、发送超时)。 |
rx-data-bit | RX data bit | 接收数据位:细分到单个数据位的接收状态,用于位级调试。 |
tx-data-bit | TX data bit | 发送数据位:细分到单个数据位的发送状态,用于位级分析。 |
rx-break | RX break | 接收中断:检测到线路持续低电平超过一帧时间(BREAK条件),可能表示通信终止或复位信号。 |
tx-break | TX break | 发送中断:主动发送BREAK信号,用于通知接收端通信中断。 |
rx-packet | RX packet | 接收数据包:标识完整接收到的数据包(包含起始位、数据位、校验位、停止位)。 |
tx-packet | TX packet | 发送数据包:标识完整发送的数据包,用于跟踪完整帧的传输过程。 |
annotation_rows 注解分组
# 注解分组(在界面中归类显示)annotation_rows = (('rx-data-bits', '接收位', (Ann.RX_DATA_BIT,)),('rx-data-vals', '接收数据', (Ann.RX_DATA, Ann.RX_START, Ann.RX_PARITY_OK, Ann.RX_PARITY_ERR, Ann.RX_STOP)),# ... 其他分组(省略) ...)
行分类 | 核心作用 | 典型使用场景 |
---|---|---|
数据位行 | 位级调试(时序、电平) | 硬件信号完整性分析 |
数据帧行 | 完整帧解析(起始、数据、校验、停止) | 协议合规性验证 |
警告/错误行 | 异常检测与诊断 | 通信稳定性测试 |
BREAK行 | 中断信号处理 | 设备复位或通信终止检测 |
数据包行 | 聚合帧数据展示 | 应用层数据处理(如ASCII消息解析) |
接收(RX)相关行
行ID | 行描述 | 包含的注解常量 | 作用 |
---|---|---|---|
rx-data-bits | RX bits | Ann.RX_DATA_BIT | 显示每个接收数据位的详细信息(如位值、时序)。 |
rx-data-vals | RX data | Ann.RX_DATA, Ann.RX_START, Ann.RX_PARITY_OK, Ann.RX_PARITY_ERR, Ann.RX_STOP | 标记完整接收帧(起始位、数据、校验、停止位)。 |
rx-warnings | RX warnings | Ann.RX_WARN | 接收过程中的警告(如波特率偏差、噪声干扰)。 |
rx-breaks | RX breaks | Ann.RX_BREAK | 接收到的BREAK信号(持续低电平中断)。 |
rx-packets | RX packets | Ann.RX_PACKET | 完整接收的数据包(包含所有位的聚合信息)。 |
发送(TX)相关行
行ID | 行描述 | 包含的注解常量 | 作用 |
---|---|---|---|
tx-data-bits | TX bits | Ann.TX_DATA_BIT | 显示每个发送数据位的详细信息。 |
tx-data-vals | TX data | Ann.TX_DATA, Ann.TX_START, Ann.TX_PARITY_OK, Ann.TX_PARITY_ERR, Ann.TX_STOP | 标记完整发送帧(起始位、数据、校验、停止位)。 |
tx-warnings | TX warnings | Ann.TX_WARN | 发送过程中的警告(如缓冲区溢出、发送超时)。 |
tx-breaks | TX breaks | Ann.TX_BREAK | 主动发送的BREAK信号。 |
tx-packets | TX packets | Ann.TX_PACKET | 完整发送的数据包。 |
put 输出方法
def putx(self, rxtx, data):s, halfbit = self.startsample[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)def putx_packet(self, rxtx, data):s, halfbit = self.ss_packet[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)def putpx(self, rxtx, data):s, halfbit = self.startsample[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_python, data)def putg(self, data):s, halfbit = self.samplenum, self.bit_width / 2.0self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data)def putp(self, data):s, halfbit = self.samplenum, self.bit_width / 2.0self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data)def putgse(self, ss, es, data):self.put(ss, es, self.out_ann, data)
方法名 | 核心用途 | 输出通道 | 时间范围计算方式 |
---|---|---|---|
putx | 标注方向性单一位事件 | 逻辑分析仪注释 | 基于通道起始采样点扩展半位宽 |
putx_packet | 标注完整数据包 | 逻辑分析仪注释 | 基于数据包起始采样点扩展半位宽 |
putpx | 输出方向性结构化数据 | Python对象 | 同 putx |
putg | 标注当前瞬时事件 | 逻辑分析仪注释 | 以当前采样点为中心扩展半位宽 |
putp | 输出当前瞬时结构化数据 | Python对象 | 同 putg |
putgse | 自定义时间范围标注 | 逻辑分析仪注释 | 直接指定起止采样点 |
putx(self, rxtx, data)
- 功能:标注单个数据位或控制位(如起始位、停止位)的时间范围,基于通道(RX/TX)的起始采样点。
- 参数:
rxtx
:方向标识,取值为Bin.RX
(接收)或Bin.TX
(发送)。data
:要输出的注解数据(如(Ann.RX_DATA_BIT, 'Bit: 1')
)。
- 时间计算:
- 起始时间:
startsample[rxtx] - floor(halfbit)
(从通道的起始采样点向前扩展半位宽) - 结束时间:
self.samplenum + ceil(halfbit)
(当前采样点向后扩展半位宽)
- 起始时间:
- 输出目标:
self.out_ann
(逻辑分析仪注释通道)。 - 应用场景:
标记接收或发送的单个数据位,确保标注覆盖整个位的持续时间。
示例:在解析到第3个数据位时,调用putx(Bin.RX, (Ann.RX_DATA_BIT, 'Bit: 1'))
。
putx_packet(self, rxtx, data)
- 功能:标注完整数据包(如整个UART帧)的时间范围,基于数据包的起始采样点。
- 参数:
rxtx
:方向标识(同putx
)。data
:数据包相关注解(如(Ann.RX_PACKET, 'Packet: 0x41')
)。
- 时间计算:
- 起始时间:
ss_packet[rxtx] - floor(halfbit)
(数据包的起始采样点向前扩展半位宽) - 结束时间:
self.samplenum + ceil(halfbit)
(当前采样点向后扩展半位宽)
- 起始时间:
- 输出目标:
self.out_ann
。 - 应用场景:
标记完整的接收或发送数据包(包含起始位、数据位、校验位、停止位)。
示例:完成一帧解析后,调用putx_packet(Bin.TX, (Ann.TX_PACKET, 'Data: A'))
。
putpx(self, rxtx, data)
- 功能:与
putx
类似,但输出到Python对象通道,用于生成结构化数据。 - 参数:同
putx
。 - 输出目标:
self.out_python
(Python对象输出通道)。 - 应用场景:
将解码后的原始数据(如字节值、校验结果)传递给其他Python模块处理,而非可视化展示。
示例:发送数据时,调用putpx(Bin.TX, {'type': 'data', 'value': 0x41})
。
putg(self, data)
- 功能:以当前采样点为中心,标注单个事件(如警告、校验结果)的时间范围。
- 参数:
data
:注解数据(如(Ann.RX_PARITY_ERR, 'Parity Error')
)。
- 时间计算:
- 起始时间:
self.samplenum - floor(halfbit)
(当前采样点向前扩展半位宽) - 结束时间:
self.samplenum + ceil(halfbit)
(当前采样点向后扩展半位宽)
- 起始时间:
- 输出目标:
self.out_ann
。 - 应用场景:
标记瞬时事件(如校验错误、警告),确保标注在波形中居中显示。
示例:检测到校验错误时,调用putg((Ann.RX_PARITY_ERR, 'Parity Error'))
。
putp(self, data)
- 功能:类似
putg
,但输出到Python对象通道。 - 参数:同
putg
。 - 输出目标:
self.out_python
。 - 应用场景:
将瞬时事件的结构化数据传递给其他处理逻辑。
示例:发送BREAK信号时,调用putp({'event': 'break', 'direction': 'rx'})
。
putgse(self, ss, es, data)
-
功能:自定义时间范围标注,直接指定起始和结束采样点。
-
参数:
ss
:起始采样点(整数)。es
:结束采样点(整数)。data
:注解数据。
-
输出目标:
self.out_ann
。 -
应用场景:
处理非标准时间范围的事件(如跨越多个位的BREAK信号)。
示例:标注一个持续低电平的BREAK信号:self.putgse(ss=100, es=200, data=(Ann.RX_BREAK, 'BREAK Detected'))
工作流程示例
- 检测起始位:当RX线路变为低电平时,记录
startsample[Bin.RX] = 当前采样点
。 - 解析数据位:逐个采样点读取数据位,调用
putx(Bin.RX, ...)
标注每个位。 - 校验与停止位:校验完成后,调用
putg(...)
标记校验结果。 - 完成数据包:调用
putx_packet(Bin.RX, ...)
输出完整帧信息。 - 输出到Python:同时调用
putpx
或putp
传递结构化数据。
reset
def reset(self):self.samplerate = Noneself.frame_start = [-1, -1]self.frame_valid = [None, None]self.cur_frame_bit = [None, None]self.startbit = [-1, -1]self.cur_data_bit = [0, 0]self.datavalue = [0, 0]self.paritybit = [-1, -1]self.stopbits = [[], []]self.startsample = [-1, -1]self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']self.databits = [[], []]self.break_start = [None, None]self.packet_cache = [[], []]self.ss_packet, self.es_packet = [None, None], [None, None]self.idle_start = [None, None]
变量 | 初始值 | 用途 | 初始值设计逻辑 |
---|---|---|---|
samplerate | None | 存储采样率(单位:Hz) | 未配置前未知,需外部设置。 |
frame_start | [-1, -1] | 接收(RX)和发送(TX)方向帧的起始样本点。 | -1 表示未检测到帧起始。 |
frame_valid | [None, None] | 标记当前帧是否有效(校验通过)。 | None 表示帧未解析完成或未验证。 |
cur_frame_bit | [None, None] | 当前处理的帧中的位数(如起始位=0,数据位=1~N)。 | None 表示未开始解析帧。 |
startbit | [-1, -1] | 起始位的样本点位置。 | -1 表示未检测到起始位。 |
cur_data_bit | [0, 0] | 当前处理的数据位索引(从0开始,对应LSB)。 | 数据位从第0位开始解析。 |
datavalue | [0, 0] | 当前帧的整数值(通过数据位计算)。 | 初始值为0,解析时按位累加。 |
paritybit | [-1, -1] | 校验位的值(0或1)。 | -1 表示未解析到校验位。 |
stopbits | [[], []] | 停止位的样本点范围列表(可能包含多个停止位)。 | 空列表表示未检测到停止位。 |
startsample | [-1, -1] | 帧的起始样本点(含起始位)。 | -1 表示帧未开始。 |
state | ['WAIT FOR START BIT', 'WAIT FOR START BIT'] | 解析器的状态机状态(RX和TX方向)。 | 空闲时持续检测起始位,符合UART协议逻辑。 |
databits | [[], []] | 解析出的数据位列表(如 [1, 0, 1] )。 | 空列表表示未解析到数据位。 |
break_start | [None, None] | BREAK信号(持续低电平)的起始样本点。 | None 表示未检测到BREAK。 |
packet_cache | [[], []] | 临时存储数据包内容(如多帧聚合)。 | 空列表表示无缓存数据。 |
ss_packet , es_packet | [None, None] , [None, None] | 数据包的起始和结束样本点。 | None 表示数据包未开始或未完成。 |
idle_start | [None, None] | 进入空闲状态的起始样本点(用于超时检测)。 | None 表示当前未处于空闲状态。 |
示例场景
接收一帧8位数据(值 0x41
,校验通过):
- 起始位:
startbit[0] = 100
,startsample[0] = 100
,state[0] = 'PROCESSING DATA BITS'
。 - 数据位:逐位解析,
cur_data_bit[0]
从0递增到7,databits[0] = [1, 0, 0, 0, 0, 0, 1, 0]
,datavalue[0] = 0x41
。 - 校验位:
paritybit[0] = 1
,校验计算通过,frame_valid[0] = True
。 - 停止位:
stopbits[0] = [180, 200]
。 - 数据包记录:
ss_packet[0] = 100
,es_packet[0] = 200
,packet_cache[0].append(0x41)
。
start
def start(self):self.out_python = self.register(srd.OUTPUT_PYTHON)self.out_binary = self.register(srd.OUTPUT_BINARY)self.out_ann = self.register(srd.OUTPUT_ANN)self.bw = (self.options['data_bits'] + 7) // 8
-
作用:向协议解析框架(如
sigrok
)注册三类输出通道,用于分发解析结果。 -
作用:根据配置的 数据位长度(
data_bits
),计算存储数据所需的 最小字节数。
Python对象通道
定义与用途
- 作用:将协议解析后的结构化数据(如完整数据包、校验结果、原始字节)传递给其他Python模块,用于自动化处理、存储或分析。
- 目标用户:软件开发工程师、自动化测试脚本,用于程序化数据消费。
数据形式
-
数据结构:通常为 字典(Dict) 或 自定义类实例,包含机器可读的字段:
- 事件类型:如
'data'
、'error'
、'packet'
。 - 数据内容:如字节值、ASCII字符串、十六进制数值。
- 元数据:时间戳、方向(RX/TX)、校验状态等。
- 事件类型:如
-
示例:
# 发送一个解析后的数据包到Python对象通道 packet_data = {'direction': 'rx', # 数据方向'timestamp': 1630450000.5, # 时间戳'data': b'\x41', # 原始字节(ASCII 'A')'parity_ok': True, # 校验结果'length': 8, # 数据位长度 } self.put(output=self.out_python, data=packet_data)
逻辑分析仪注释通道
定义与用途
- 作用:将协议解析后的关键事件(如数据位、起始位、校验结果等)以可视化注释的形式标注在逻辑分析仪的波形界面上,帮助用户直观理解信号含义。
- 目标用户:硬件工程师、调试人员,用于实时或离线分析信号时序和协议合规性。
数据形式
-
数据结构:通常为元组(Tuple) 或 特定对象,包含以下信息:
- 时间范围:起始和结束的采样点(或时间戳),确定注释在波形上的显示范围。
- 注释类型:预定义的常量(如
Ann.RX_DATA
表示接收数据),用于分类事件。 - 描述信息:可读的文本或数值(如数据位的值
'Bit: 1'
)。
-
示例:
# 标注一个接收数据位(值1)的时间范围和类型 annotation = (Ann.RX_DATA_BIT, # 注释类型(常量)'Bit: 1', # 描述信息 ) self.put(ss=100, es=150, output=self.out_ann, data=annotation)
- 在波形界面中,从样本点100到150的区间内显示“Bit: 1”。
def metadata(self, key, value):if key == srd.SRD_CONF_SAMPLERATE:self.samplerate = value# The width of one UART bit in number of samples.self.bit_width = float(self.samplerate) / float(self.options['baudrate'])
-
self.samplerate
- 作用:存储当前采样率(单位:Hz),用于后续时间与样本点转换。
- 示例:
samplerate = 1_000_000
表示每秒采集 100 万个样本。
-
self.bit_width
-
作用:表示每个 UART 位占用的样本数,直接影响解析精度和时序计算。
-
计算逻辑:
- 采样率(
samplerate
)决定时间分辨率。 - 波特率(
baudrate
)决定每秒传输的位数。 - 二者的比值即为每个位的样本数。
- 采样率(
-
示例:
-
若
samplerate=12 MHz
,baudrate=115200
,则:bit_width=12 000 000/115 200≈104.17 个样本/位
-
-
def get_sample_point(self, rxtx, bitnum):# Determine absolute sample number of a bit slot's sample point.# Counts for UART bits start from 0 (0 = start bit, 1..x = data,# x+1 = parity bit (if used) or the first stop bit, and so on).# Accept a position in the range of 1-99% of the full bit width.# Assume 50% for invalid input specs for backwards compatibility.perc = self.options['sample_point'] or 50if not perc or perc not in range(1, 100):perc = 50perc /= 100.0bitpos = (self.bit_width - 1) * percbitpos += self.frame_start[rxtx]bitpos += bitnum * self.bit_widthreturn bitpos
- 目的:根据配置的采样点百分比,确定某一位(如起始位、数据位、校验位)的采样位置(绝对样本数)。
rxtx | 枚举 | 通信方向(RX 或 TX ),用于获取对应方向的帧起始样本点。 |
---|---|---|
bitnum | 整数 | 位编号(0 = 起始位,1~N = 数据位,N+1 = 校验位/停止位)。 |
-
获取采样点百分比
perc = self.options['sample_point'] or 50 if not perc or perc not in range(1, 100):perc = 50 perc /= 100.0
- 默认值处理:若
sample_point
无效(非 1-99),默认使用 50%。 - 百分比转换:将百分比转换为小数(如 50% → 0.5)。
- 默认值处理:若
-
计算位内采样偏移
bitpos = (self.bit_width - 1) * perc
- 位宽调整:
self.bit_width - 1
确保采样点在位的时间范围内(如位宽为 10 样本时,采样点范围是 0~9)。
- 位宽调整:
-
确定绝对位置
bitpos += self.frame_start[rxtx] bitpos += bitnum * self.bit_width
- 帧起始偏移:从帧的起始样本点开始。
- 位编号偏移:累加前序位的总样本数(如
bitnum=2
表示跨越 2 个完整位)。
示例场景
配置参数:
- 波特率
115200
,采样率1,000,000 Hz
→ 位宽8.68
样本/位。 - 采样点
70%
,方向RX
,帧起始样本点1000
,位编号1
(第 1 个数据位)。
计算过程:
-
位内偏移:
(8.68−1)×0.7≈5.376(8.68−1)×0.7≈5.376
-
绝对位置:
1000+5.376+1×8.68≈1014.0561000+5.376+1×8.68≈1014.056
-
最终采样点:约
1014
(取整)。
wait_for_start_bit
def wait_for_start_bit(self, rxtx, signal):# Save the sample number where the start bit begins.self.frame_start[rxtx] = self.samplenumself.frame_valid[rxtx] = Trueself.cur_frame_bit[rxtx] = 0
- 核心作用:当检测到起始位(低电平信号)时,初始化帧解析所需的关键参数,并标记帧开始。
- 触发条件:在 UART 空闲状态(高电平)中检测到信号跳变为低电平。
变量 | 类型 | 说明 |
---|---|---|
frame_start | 列表(int) | 存储 RX/TX 方向帧的起始样本点(如 [1000, -1] 表示 RX 帧从样本 1000 开始)。 |
frame_valid | 列表(bool) | 标记帧是否有效(如校验通过则为 True ,否则为 False )。 |
cur_frame_bit | 列表(int) | 当前处理的位索引(0 = 起始位,1~N = 数据位,N+1 = 校验位/停止位)。 |
samplenum | 整数 | 当前解析器处理的样本点编号(由框架自动更新)。 |
- 记录起始位样本点
self.frame_start[rxtx] = self.samplenum
将当前采样点(self.samplenum
)保存到对应方向(RX/TX)的frame_start
数组中,用于后续计算数据位、校验位和停止位的时间范围。
- 标记帧有效性
self.frame_valid[rxtx] = True
假设帧在起始阶段有效,后续可能根据校验结果(如奇偶校验、停止位验证)更新此标记。
- 初始化位计数器
self.cur_frame_bit[rxtx] = 0
从0
开始计数,表示当前处理的是起始位。数据位通常从索引1
开始递增。
get_start_bit
def get_start_bit(self, rxtx, signal):self.startbit[rxtx] = signalself.cur_frame_bit[rxtx] += 1# The startbit must be 0. If not, we report an error and wait# for the next start bit (assuming this one was spurious).if self.startbit[rxtx] != 0:self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = Falsees = self.samplenum + ceil(self.bit_width / 2.0)self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])self.advance_state(rxtx, signal, fatal = True, idle = es)return# Reset internal state for the pending UART frame.self.cur_data_bit[rxtx] = 0self.datavalue[rxtx] = 0self.paritybit[rxtx] = -1self.stopbits[rxtx].clear()self.startsample[rxtx] = -1self.databits[rxtx].clear()self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']])self.advance_state(rxtx, signal)
startbit[rxtx] | 存储起始位电平值(应为 0 )。 |
---|---|
cur_frame_bit[rxtx] | 跟踪当前处理的帧位索引(起始位为 0 ,数据位从 1 开始)。 |
frame_valid[rxtx] | 标记帧有效性,初始为 True ,错误时置 False 。 |
putp / putg | 输出日志和注释,分别对应 Python 对象通道和逻辑分析仪注释通道。 |
advance_state | 状态机驱动方法,控制解析流程(如跳转到数据位或空闲状态)。 |
记录起始位信号
self.startbit[rxtx] = signal
self.cur_frame_bit[rxtx] += 1
- 目的:
- 保存当前信号值到
startbit
数组,标记起始位的电平状态。 - 递增当前帧的位计数器(
cur_frame_bit
),表示开始处理起始位。
- 保存当前信号值到
验证起始位有效性
if self.startbit[rxtx] != 0:# 错误处理:输出日志和警告注释self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])# 标记帧无效self.frame_valid[rxtx] = False# 计算错误帧的结束样本点es = self.samplenum + ceil(self.bit_width / 2.0)# 记录错误帧信息self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])# 推进状态机到空闲状态self.advance_state(rxtx, signal, fatal=True, idle=es)return
- 核心逻辑:python
- 若起始位非低电平(
signal != 0
),判定为无效起始位(可能由噪声引起)。 - 错误处理步骤:
- 输出日志:通过
putp
记录错误类型(INVALID STARTBIT
)、方向和信号值。 - 可视化警告:通过
putg
在逻辑分析仪界面生成错误注释(如Frame error
)。 - 标记帧无效:
frame_valid[rxtx] = False
,后续流程将忽略此帧。 - 记录错误帧范围:通过
putpse
标注错误帧的起止样本点,供调试参考。 - 状态转移:调用
advance_state
强制进入空闲状态(idle=es
),等待下一帧起始位。
- 输出日志:通过
- 若起始位非低电平(
##有效起始位处理**
# 重置内部状态
self.cur_data_bit[rxtx] = 0
self.datavalue[rxtx] = 0
self.paritybit[rxtx] = -1
self.stopbits[rxtx].clear()
self.startsample[rxtx] = -1
self.databits[rxtx].clear()# 输出起始位信息
self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])
self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']])# 推进状态机到下一状态
self.advance_state(rxtx, signal)
- 目的:
- 状态初始化:清空或重置与帧解析相关的变量,确保无残留数据干扰新帧。
- 日志与注释:记录有效起始位事件,支持后续调试和可视化。
- 状态转移:调用
advance_state
进入数据位解析阶段。
handle_packet
def handle_packet(self, rxtx):d = 'rx' if (rxtx == RX) else 'tx'delim = self.options[d + '_packet_delim']plen = self.options[d + '_packet_len']if delim == -1 and plen == -1:return# Cache data values until we see the delimiter and/or the specified# packet length has been reached (whichever happens first).if len(self.packet_cache[rxtx]) == 0:self.ss_packet[rxtx] = self.startsample[rxtx]self.packet_cache[rxtx].append(self.datavalue[rxtx])if self.datavalue[rxtx] == delim or len(self.packet_cache[rxtx]) == plen:self.es_packet[rxtx] = self.samplenums = ''for b in self.packet_cache[rxtx]:s += self.format_value(b)if self.options['format'] != 'ascii':s += ' 'if self.options['format'] != 'ascii' and s[-1] == ' ':s = s[:-1] # Drop trailing space.self.putx_packet(rxtx, [Ann.RX_PACKET + rxtx, [s]])self.packet_cache[rxtx] = []
- 根据配置的分隔符或长度条件判断是否结束当前数据包的解析,并在满足条件时触发数据包的最终处理。
packet_cache[rxtx] | 临时存储数据包内容(如 [0x41, 0x42] )。 |
---|---|
ss_packet[rxtx] | 数据包起始样本点(第一个数据位的开始时间)。 |
es_packet[rxtx] | 数据包结束样本点(触发结束条件的当前样本点)。 |
rx_packet_delim | 接收方向的分隔符(如 10 表示换行符)。 |
tx_packet_len | 发送方向的固定数据包长度(如 64 字节)。 |
format_value(b) | 根据 format 配置(ASCII/HEX等)将字节转换为字符串。 |
确定配置参数
d = 'rx' if (rxtx == RX) else 'tx' # 生成配置键前缀(如 "rx_" 或 "tx_")
delim = self.options[d + '_packet_delim'] # 分隔符(十进制ASCII值)
plen = self.options[d + '_packet_len'] # 数据包固定长度
if delim == -1 and plen == -1:return # 未配置分隔符和长度,无需处理
- 分隔符:若配置为
-1
,表示不依赖分隔符结束数据包。 - 固定长度:若配置为
-1
,表示不依赖固定长度。 - 检查有效性:若两者均为
-1
,直接返回,数据包由其他逻辑(如停止位)触发结束。
缓存数据并记录起始样本点
if len(self.packet_cache[rxtx]) == 0:self.ss_packet[rxtx] = self.startsample[rxtx] # 记录数据包起始样本点
self.packet_cache[rxtx].append(self.datavalue[rxtx]) # 缓存当前数据值
- 首次缓存:若缓存为空,记录数据包的起始样本点(即第一个数据位的起始位置)。
- 逐帧累积:将当前解析出的数据值(如
0x41
)存入缓存列表。
判断数据包结束条件
if self.datavalue[rxtx] == delim or len(self.packet_cache[rxtx]) == plen:# 满足结束条件(分隔符或长度)self.es_packet[rxtx] = self.samplenum # 记录数据包结束样本点# 格式化数据包内容s = ''for b in self.packet_cache[rxtx]:s += self.format_value(b) # 根据配置(ASCII/HEX等)格式化值if self.options['format'] != 'ascii':s += ' ' # 非ASCII格式添加分隔符(如十六进制值之间加空格)if self.options['format'] != 'ascii' and s[-1] == ' ':s = s[:-1] # 移除末尾多余空格# 输出数据包注释self.putx_packet(rxtx, [Ann.RX_PACKET + rxtx, [s]])self.packet_cache[rxtx] = [] # 清空缓存
- 结束条件:
- 分隔符触发:当前数据值等于配置的分隔符(如
0x0A
表示换行符\n
)。 - 长度触发:缓存数据长度达到配置的固定值(如
64
字节)。
- 分隔符触发:当前数据值等于配置的分隔符(如
- 数据格式化:
- ASCII:直接拼接字符(如
b'\x41\x42'
→"AB"
)。 - 非ASCII(HEX/DEC/OCT/BIN):用空格分隔值(如
"41 42"
)。
- ASCII:直接拼接字符(如
- 输出与清理:
- 调用
putx_packet
输出数据包注释(如Ann.RX_PACKET
)。 - 清空缓存,准备接收下一数据包。
- 调用
get_data_bits
def get_data_bits(self, rxtx, signal):# Save the sample number of the middle of the first data bit.if self.startsample[rxtx] == -1:self.startsample[rxtx] = self.samplenumself.putg([Ann.RX_DATA_BIT + rxtx, ['%d' % signal]])# Store individual data bits and their start/end samplenumbers.s, halfbit = self.samplenum, int(self.bit_width / 2)self.databits[rxtx].append([signal, s - halfbit, s + halfbit])self.cur_frame_bit[rxtx] += 1# Return here, unless we already received all data bits.self.cur_data_bit[rxtx] += 1if self.cur_data_bit[rxtx] < self.options['data_bits']:return# Convert accumulated data bits to a data value.bits = [b[0] for b in self.databits[rxtx]]if self.options['bit_order'] == 'msb-first':bits.reverse()self.datavalue[rxtx] = bitpack(bits)self.putpx(rxtx, ['DATA', rxtx,(self.datavalue[rxtx], self.databits[rxtx])])b = self.datavalue[rxtx]formatted = self.format_value(b)if formatted is not None:self.putx(rxtx, [rxtx, [formatted]])bdata = b.to_bytes(self.bw, byteorder='big')self.putbin(rxtx, [Bin.RX + rxtx, bdata])self.putbin(rxtx, [Bin.RXTX, bdata])self.handle_packet(rxtx)self.databits[rxtx] = []self.advance_state(rxtx, signal)
format_value
def format_value(self, v):# Format value 'v' according to configured options.# Reflects the user selected kind of representation, as well as# the number of data bits in the UART frames.fmt, bits = self.options['format'], self.options['data_bits']# Assume "is printable" for values from 32 to including 126,# below 32 is "control" and thus not printable, above 127 is# "not ASCII" in its strict sense, 127 (DEL) is not printable,# fall back to hex representation for non-printables.if fmt == 'ascii':if v in range(32, 126 + 1):return chr(v)hexfmt = "[{:02X}]" if bits <= 8 else "[{:03X}]"return hexfmt.format(v)# Mere number to text conversion without prefix and padding# for the "decimal" output format.if fmt == 'dec':return "{:d}".format(v)# Padding with leading zeroes for hex/oct/bin formats, but# without a prefix for density -- since the format is user# specified, there is no ambiguity.if fmt == 'hex':digits = (bits + 4 - 1) // 4fmtchar = "X"elif fmt == 'oct':digits = (bits + 3 - 1) // 3fmtchar = "o"elif fmt == 'bin':digits = bitsfmtchar = "b"else:fmtchar = Noneif fmtchar is not None:fmt = "{{:0{:d}{:s}}}".format(digits, fmtchar)return fmt.format(v)return None
- 逐位接收数据,将其累积并转换为完整的数据值,同时生成可视化注释和结构化输出。其核心作用包括:
- 记录数据位的时序信息(起始/结束样本点)。
- 按配置的位顺序组合数据位(LSB-first 或 MSB-first)。
- 格式化数据值(如十六进制、ASCII)。
- 触发数据包处理逻辑(当数据位接收完成时)。
get_parity_bit
def get_parity_bit(self, rxtx, signal):self.paritybit[rxtx] = signalself.cur_frame_bit[rxtx] += 1if parity_ok(self.options['parity'], self.paritybit[rxtx],self.datavalue[rxtx], self.options['data_bits']):self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])self.putg([Ann.RX_PARITY_OK + rxtx, ['Parity bit', 'Parity', 'P']])else:# TODO: Return expected/actual parity values.self.putp(['PARITY ERROR', rxtx, (0, 1)]) # FIXME: Dummy tuple...self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']])self.frame_valid[rxtx] = Falseself.advance_state(rxtx, signal)
- 用于处理 UART 协议中的 奇偶校验位,验证其正确性并根据结果更新帧状态。以下是分步解析:
- 记录校验位值:保存校验位的电平值。
- 校验验证:根据配置的校验类型(奇校验、偶校验等),验证校验位是否合法。
- 状态更新与输出:
- 若校验通过:输出校验位信息至日志和可视化界面。
- 若校验失败:标记帧为无效,输出错误信息。
- 状态转移:推进状态机至下一阶段(如停止位处理)
保存校验位值并更新位计数器
self.paritybit[rxtx] = signal
self.cur_frame_bit[rxtx] += 1
paritybit[rxtx]
:存储校验位的实际电平值(0
或1
)。cur_frame_bit[rxtx]
:递增帧内位计数器,表示已处理完校验位。
校验位验证
if parity_ok(self.options['parity'], self.paritybit[rxtx],self.datavalue[rxtx], self.options['data_bits']):# 校验成功self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])self.putg([Ann.RX_PARITY_OK + rxtx, ['Parity bit', 'Parity', 'P']])
else:# 校验失败self.putp(['PARITY ERROR', rxtx, (0, 1)]) # 虚拟元组,需完善self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']])self.frame_valid[rxtx] = False
-
parity_ok()
函数:
根据校验类型、校验位值、数据值(datavalue
)和数据位数,返回校验结果。
示例逻辑:def parity_ok(parity_type, parity_bit, data, data_bits):if parity_type == 'none':return True # 无校验,始终通过# 计算数据位中1的个数ones = bin(data).count('1') if parity_type == 'even':return (ones + parity_bit) % 2 == 0elif parity_type == 'odd':return (ones + parity_bit) % 2 == 1# 其他校验类型(如 'zero', 'one')...
-
校验成功:
putp
:输出校验位信息到 Python 对象通道(如日志记录)。putg
:在逻辑分析仪界面标注校验位(如 “Parity bit”)。
-
校验失败:
- 输出错误信息到 Python 和注释通道。
- 标记帧无效(
frame_valid[rxtx] = False
),后续流程可能忽略此帧。 - 注意:代码中使用了虚拟元组
(0, 1)
,需替换为实际期望值与实际值(如(expected_parity, actual_parity)
)。
状态转移
self.advance_state(rxtx, signal)
-
作用:推进状态机至下一状态(如处理停止位)。
-
实现逻辑(伪代码):
def advance_state(self, rxtx, signal):if self.state[rxtx] == 'PARITY_CHECK':self.state[rxtx] = 'PROCESS_STOP_BITS'
get_stop_bits
def get_stop_bits(self, rxtx, signal):self.stopbits[rxtx].append(signal)self.cur_frame_bit[rxtx] += 1# Stop bits must be 1. If not, we report an error.if signal != 1:self.putp(['INVALID STOPBIT', rxtx, signal])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = Falseself.putp(['STOPBIT', rxtx, signal])self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']])# Postprocess the UART frame after all STOP bits were seen.if len(self.stopbits[rxtx]) < self.options['stop_bits']:returnself.advance_state(rxtx, signal)
get_stop_bits
负责 处理停止位,验证其合法性并控制状态机流转。核心功能包括:
- 记录停止位值:将每个停止位的电平值存入缓存。
- 停止位有效性检查:验证停止位是否为高电平(
1
),否则标记帧无效。 - 状态机推进:当所有配置的停止位处理完成后,进入下一状态(如空闲状态)。
记录停止位值并更新位计数器
self.stopbits[rxtx].append(signal)
self.cur_frame_bit[rxtx] += 1
stopbits[rxtx]
:列表存储每个停止位的电平值(如[1, 1]
)。cur_frame_bit
:递增帧内位计数器,表示已处理完当前停止位。
验证停止位合法性
if signal != 1:self.putp(['INVALID STOPBIT', rxtx, signal])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = False
- 停止位规则:UART 协议要求停止位为高电平(
1
)。 - 错误处理:
putp
:输出错误信息到 Python 对象通道(如日志记录)。putg
:在逻辑分析仪界面标注错误(如Frame error
)。- 标记帧无效(
frame_valid[rxtx] = False
)。
输出停止位信息
self.putp(['STOPBIT', rxtx, signal])
self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']])
- 日志与注释:无论停止位是否有效,均记录其存在(如
Stop bit
)。
检查停止位数量
if len(self.stopbits[rxtx]) < self.options['stop_bits']:return
self.advance_state(rxtx, signal)
- 数量判断:比较已接收的停止位数量与配置值(如
stop_bits=1.0
)。 - 潜在问题:
若stop_bits
为浮点数(如1.5
),len(stopbits[rxtx])
可能无法正确匹配(需特殊处理时间长度而非数量)。 - 状态转移:满足数量后调用
advance_state
,进入空闲状态或下一帧解析。
def handle_frame(self, rxtx, ss, es):# Pass the complete UART frame to upper layers.self.putpse(ss, es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])def handle_idle(self, rxtx, ss, es):self.putpse(ss, es, ['IDLE', rxtx, 0])def handle_break(self, rxtx, ss, es):self.putpse(ss, es, ['BREAK', rxtx, 0])self.putgse(ss, es, [Ann.RX_BREAK + rxtx,['Break condition', 'Break', 'Brk', 'B']])self.state[rxtx] = 'WAIT FOR START BIT'
get_wait_cond
def get_wait_cond(self, rxtx, inv):# Return condititions that are suitable for Decoder.wait(). Those# conditions either match the falling edge of the START bit, or# the sample point of the next bit time.state = self.state[rxtx]if state == 'WAIT FOR START BIT':return {rxtx: 'r' if inv else 'f'}if state in ('GET START BIT', 'GET DATA BITS','GET PARITY BIT', 'GET STOP BITS'):bitnum = self.cur_frame_bit[rxtx]# TODO: Currently does not support half STOP bits.want_num = ceil(self.get_sample_point(rxtx, bitnum))return {'skip': want_num - self.samplenum}
get_wait_cond
方法用于动态生成解码器的等待条件,确保协议解析过程能够精确同步到UART帧的各个阶段。
根据当前解析状态(如等待起始位、处理数据位等),返回以下两种条件之一:
- 边沿触发条件:用于检测起始位的下降沿(或上升沿,若信号反转)。
- 采样点跳转条件:指示解码器需要跳过的样本数,以对齐到下一个位的采样点。
取当前状态
state = self.state[rxtx]
- 作用:
根据方向(RX/TX)获取解析器的当前状态,如'WAIT FOR START BIT'
或'GET DATA BITS'
。
处理「等待起始位」状态
if state == 'WAIT FOR START BIT':return {rxtx: 'r' if inv else 'f'}
- 逻辑:
- 起始位是UART帧的开始,协议要求起始位为低电平(逻辑
0
)。 inv
参数的作用:- 若
inv = False
(默认):等待下降沿('f'
),即从空闲高电平跳变到起始位低电平。 - 若
inv = True
(信号反转):等待上升沿('r'
),即反转后的低电平变为高电平。
- 若
- 起始位是UART帧的开始,协议要求起始位为低电平(逻辑
- 返回值:
字典{RX: 'f'}
或{TX: 'r'}
,表示需要监测对应通道的边沿事件。
处理「数据/校验/停止位」状态
if state in ('GET START BIT', 'GET DATA BITS', 'GET PARITY BIT', 'GET STOP BITS'):bitnum = self.cur_frame_bit[rxtx]want_num = ceil(self.get_sample_point(rxtx, bitnum))return {'skip': want_num - self.samplenum}
- 逻辑:
- 获取当前位号:
bitnum
表示当前正在处理的帧内位索引(如起始位为0
,数据位从1
开始)。 - 计算目标样本点:
调用get_sample_point(rxtx, bitnum)
计算当前位的采样点(如位的中间位置)。- 示例:若位宽为
10
样本,采样点配置为50%
,则采样点为起始样本点 + 5
。 ceil
的作用:将浮点采样点向上取整,确保跳转到整数样本位置。
- 示例:若位宽为
- 计算需跳过的样本数:
want_num - self.samplenum
表示从当前样本点 (self.samplenum
) 到目标样本点 (want_num
) 需要跳过的样本数。
- 获取当前位号:
- 返回值:
字典{'skip': N}
,指示解码器跳过N
个样本后继续解析。
get_idle_cond
def get_idle_cond(self, rxtx, inv):# Return a condition that corresponds to the (expected) end of# the next frame, assuming that it will be an "idle frame"# (constant high input level for the frame's length).if self.idle_start[rxtx] is None:return Noneend_of_frame = self.idle_start[rxtx] + self.frame_len_sample_countif end_of_frame < self.samplenum:return Nonereturn {'skip': end_of_frame - self.samplenum}
- 在 UART 协议解析中用于检测空闲状态(帧间持续高电平),其作用是根据当前空闲开始时间计算需要跳过的样本数以确认空闲帧结束。
空闲状态检测条件
- 检查空闲开始时间:若
self.idle_start[rxtx]
为None
,表示未开始检测空闲,返回None
。 - 计算预期结束点:
end_of_frame = 空闲开始时间 + 一帧总样本数
。 - 超时判断:若当前样本
samplenum
已超过end_of_frame
,返回None
(空闲已结束)。 - 返回跳过样本数:若未超时,返回需跳过的样本数 `{‘skip’:
inspect_sample
def inspect_sample(self, rxtx, signal, inv):# Inspect a sample returned by .wait() for the specified UART line.if inv:signal = not signalstate = self.state[rxtx]if state == 'WAIT FOR START BIT':self.wait_for_start_bit(rxtx, signal)elif state == 'GET START BIT':self.get_start_bit(rxtx, signal)elif state == 'GET DATA BITS':self.get_data_bits(rxtx, signal)elif state == 'GET PARITY BIT':self.get_parity_bit(rxtx, signal)elif state == 'GET STOP BITS':self.get_stop_bits(rxtx, signal)
- 是 UART 协议解析器的核心驱动方法,根据当前解析状态(如等待起始位、处理数据位等),处理单个样本点并调用相应的子方法推进解析流程。
信号反转处理
if inv:signal = not signal
- 作用:根据
inv
配置调整电平逻辑,适配硬件差异。- 若
inv = True
:signal = 1
变为0
,signal = 0
变为1
。 - 应用场景:某些硬件(如 RS-232)使用负逻辑(高电平为
0
,低电平为1
),需反转信号。
- 若
状态路由与解析
state = self.state[rxtx]
if state == 'WAIT FOR START BIT':self.wait_for_start_bit(rxtx, signal)
elif state == 'GET START BIT':self.get_start_bit(rxtx, signal)
elif state == 'GET DATA BITS':self.get_data_bits(rxtx, signal)
elif state == 'GET PARITY BIT':self.get_parity_bit(rxtx, signal)
elif state == 'GET STOP BITS':self.get_stop_bits(rxtx, signal)
- 状态机驱动:根据当前状态调用对应的处理方法,各状态含义如下:
状态 | 调用的方法 | 作用 |
---|---|---|
WAIT FOR START BIT | wait_for_start_bit | 检测起始位下降沿,初始化帧解析状态。 |
GET START BIT | get_start_bit | 验证起始位有效性,重置数据缓存,准备接收数据位。 |
GET DATA BITS | get_data_bits | 逐位接收数据,组合为整数值,格式化输出。 |
GET PARITY BIT | get_parity_bit | 验证校验位合法性,标记帧有效性。 |
GET STOP BITS | get_stop_bits | 检查停止位是否为高电平,完成帧解析后推进至空闲状态。 |
执行流程示例
场景:接收一帧 8N1 数据(无校验,1停止位)
- 初始状态:
state = 'WAIT FOR START BIT'
- 检测到下降沿(反转后为低电平),调用
wait_for_start_bit
:- 记录起始位样本点,状态变为
'GET START BIT'
。
- 记录起始位样本点,状态变为
- 检测到下降沿(反转后为低电平),调用
- 处理起始位:
state = 'GET START BIT'
- 调用
get_start_bit
:- 验证起始位为低电平,状态变为
'GET DATA BITS'
。
- 验证起始位为低电平,状态变为
- 调用
- 接收数据位:
state = 'GET DATA BITS'
- 循环调用
get_data_bits
8次:- 每次读取一位数据,存储并更新数据值。
- 完成8位后,状态变为
'GET STOP BITS'
。
- 循环调用
- 处理停止位:
state = 'GET STOP BITS'
- 调用
get_stop_bits
:- 验证停止位为高电平,状态回到
'WAIT FOR START BIT'
。
- 验证停止位为高电平,状态回到
- 调用
inspect_edge
def inspect_edge(self, rxtx, signal, inv):# Inspect edges, independently from traffic, to detect break conditions.if inv:signal = not signalif not signal:# Signal went low. Start another interval.self.break_start[rxtx] = self.samplenumreturn# Signal went high. Was there an extended period with low signal?if self.break_start[rxtx] is None:returndiff = self.samplenum - self.break_start[rxtx]if diff >= self.break_min_sample_count:ss, es = self.frame_start[rxtx], self.samplenumself.handle_break(rxtx, ss, es)self.break_start[rxtx] = Nonedef inspect_idle(self, rxtx, signal, inv):# Check each edge and each period of stable input (either level).# Can derive the "idle frame period has passed" condition.if inv:signal = not signalif not signal:# Low input, cease inspection.self.idle_start[rxtx] = Nonereturn# High input, either just reached, or still stable.if self.idle_start[rxtx] is None:self.idle_start[rxtx] = self.samplenumdiff = self.samplenum - self.idle_start[rxtx]if diff < self.frame_len_sample_count:returnss, es = self.idle_start[rxtx], self.samplenumself.handle_idle(rxtx, ss, es)self.idle_start[rxtx] = es
- 通过检测信号边沿来识别 BREAK 条件(持续低电平超过一帧时间)
-
信号反转:
if inv:signal = not signal
- 适配硬件电平逻辑(如 RS-232 使用负逻辑)。
-
检测下降沿(低电平):
if not signal:self.break_start[rxtx] = self.samplenumreturn
- 当信号变为低电平时,记录起始样本点
break_start[rxtx]
。
- 当信号变为低电平时,记录起始样本点
-
检测上升沿(高电平):
if self.break_start[rxtx] is None:return diff = self.samplenum - self.break_start[rxtx] if diff >= self.break_min_sample_count:ss, es = self.frame_start[rxtx], self.samplenumself.handle_break(rxtx, ss, es) self.break_start[rxtx] = None
-
当信号恢复高电平时,计算低电平持续时间
diff
。 -
若
diff
超过阈值break_min_sample_count
(通常为一帧时间的样本数),调用handle_break
处理 BREAK 条件。 -
潜在问题:
ss
错误地使用frame_start[rxtx]
(应为break_start[rxtx]
),需修正为:ss, es = self.break_start[rxtx], self.samplenum
-
- 检测 空闲状态(持续高电平超过一帧时间),用于验证帧间间隔的合规性。
-
信号反转:
if inv:signal = not signal
-
低电平处理:
if not signal:self.idle_start[rxtx] = Nonereturn
- 检测到低电平时,重置空闲起始点
idle_start[rxtx]
。
- 检测到低电平时,重置空闲起始点
-
高电平处理:
if self.idle_start[rxtx] is None:self.idle_start[rxtx] = self.samplenum diff = self.samplenum - self.idle_start[rxtx] if diff < self.frame_len_sample_count:return ss, es = self.idle_start[rxtx], self.samplenum self.handle_idle(rxtx, ss, es) self.idle_start[rxtx] = es
- 持续高电平时,计算持续时间
diff
。 - 若超过阈值
frame_len_sample_count
(通常为一帧时间),调用handle_idle
处理空闲状态。 - 潜在问题:更新
idle_start[rxtx] = es
可能导致后续空闲段被分割。建议改为idle_start[rxtx] = None
以重新检测连续空闲。
- 持续高电平时,计算持续时间
decode
def decode(self):if not self.samplerate:raise SamplerateError('Cannot decode without samplerate.')has_pin = [self.has_channel(ch) for ch in (RX, TX)]if not True in has_pin:raise ChannelError('Need at least one of TX or RX pins.')opt = self.optionsinv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']cond_data_idx = [None] * len(has_pin)# Determine the number of samples for a complete frame's time span.# A period of low signal (at least) that long is a break condition.frame_samples = 1 # STARTframe_samples += self.options['data_bits']frame_samples += 0 if self.options['parity'] == 'none' else 1frame_samples += self.options['stop_bits']frame_samples *= self.bit_widthself.frame_len_sample_count = ceil(frame_samples)self.break_min_sample_count = self.frame_len_sample_countcond_edge_idx = [None] * len(has_pin)cond_idle_idx = [None] * len(has_pin)while True:conds = []if has_pin[RX]:cond_data_idx[RX] = len(conds)conds.append(self.get_wait_cond(RX, inv[RX]))cond_edge_idx[RX] = len(conds)conds.append({RX: 'e'})cond_idle_idx[RX] = Noneidle_cond = self.get_idle_cond(RX, inv[RX])if idle_cond:cond_idle_idx[RX] = len(conds)conds.append(idle_cond)if has_pin[TX]:cond_data_idx[TX] = len(conds)conds.append(self.get_wait_cond(TX, inv[TX]))cond_edge_idx[TX] = len(conds)conds.append({TX: 'e'})cond_idle_idx[TX] = Noneidle_cond = self.get_idle_cond(TX, inv[TX])if idle_cond:cond_idle_idx[TX] = len(conds)conds.append(idle_cond)(rx, tx) = self.wait(conds)if cond_data_idx[RX] is not None and self.matched[cond_data_idx[RX]]:self.inspect_sample(RX, rx, inv[RX])if cond_edge_idx[RX] is not None and self.matched[cond_edge_idx[RX]]:self.inspect_edge(RX, rx, inv[RX])self.inspect_idle(RX, rx, inv[RX])if cond_idle_idx[RX] is not None and self.matched[cond_idle_idx[RX]]:self.inspect_idle(RX, rx, inv[RX])if cond_data_idx[TX] is not None and self.matched[cond_data_idx[TX]]:self.inspect_sample(TX, tx, inv[TX])if cond_edge_idx[TX] is not None and self.matched[cond_edge_idx[TX]]:self.inspect_edge(TX, tx, inv[TX])self.inspect_idle(TX, tx, inv[TX])if cond_idle_idx[TX] is not None and self.matched[cond_idle_idx[TX]]:self.inspect_idle(TX, tx, inv[TX])
- UART 协议解析器的入口方法,负责协调整个解析流程。其核心任务包括:
- 初始化校验:确保采样率和通道配置有效。
- 计算帧参数:确定帧长度和 BREAK 条件的最小样本数。
- 事件循环:监听 RX/TX 通道的各类事件(数据、边沿、空闲),并调用对应的处理方法。
初始校验
if not self.samplerate:raise SamplerateError('Cannot decode without samplerate.')has_pin = [self.has_channel(ch) for ch in (RX, TX)]
if not True in has_pin:raise ChannelError('Need at least one of TX or RX pins.')
- 校验采样率:必须配置采样率以计算时间相关参数。
- 校验通道:至少启用 RX 或 TX 通道。
信号反转配置
opt = self.options
inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']
inv
列表:标记 RX 和 TX 是否需要信号反转。例如,inv[RX] = True
表示 RX 通道的电平逻辑需取反。
帧参数计算
frame_samples = 1 # START
frame_samples += self.options['data_bits']
frame_samples += 0 if self.options['parity'] == 'none' else 1
frame_samples += self.options['stop_bits']
frame_samples *= self.bit_width
self.frame_len_sample_count = ceil(frame_samples)
self.break_min_sample_count = self.frame_len_sample_count
-
帧长度计算:
- 起始位:固定 1 位。
- 数据位:
data_bits
位。 - 校验位:根据配置(
parity
)决定是否添加。 - 停止位:直接累加
stop_bits
(可能为浮点数,如 1.5)。 - 总样本数:总位数 × 位宽(
bit_width
)。
-
问题:浮点停止位直接累加到整数
frame_samples
会导致精度丢失。
改进建议:frame_samples = (1 + # STARTself.options['data_bits'] +(0 if self.options['parity'] == 'none' else 1) +self.options['stop_bits'] ) * self.bit_width self.frame_len_sample_count = ceil(frame_samples)
主事件循环
while True:conds = []# 为 RX 和 TX 通道构建等待条件if has_pin[RX]:# 数据条件(如等待起始位或采样点)cond_data_idx[RX] = len(conds)conds.append(self.get_wait_cond(RX, inv[RX]))# 边沿条件(检测任何边沿)cond_edge_idx[RX] = len(conds)conds.append({RX: 'e'})# 空闲条件(持续高电平)cond_idle_idx[RX] = Noneidle_cond = self.get_idle_cond(RX, inv[RX])if idle_cond:cond_idle_idx[RX] = len(conds)conds.append(idle_cond)# 同理处理 TX 通道...# 等待任一条件触发(rx, tx) = self.wait(conds)# 处理 RX 通道触发的事件if cond_data_idx[RX] is not None and self.matched[cond_data_idx[RX]]:self.inspect_sample(RX, rx, inv[RX])if cond_edge_idx[RX] is not None and self.matched[cond_edge_idx[RX]]:self.inspect_edge(RX, rx, inv[RX])self.inspect_idle(RX, rx, inv[RX])if cond_idle_idx[RX] is not None and self.matched[cond_idle_idx[RX]]:self.inspect_idle(RX, rx, inv[RX])# 同理处理 TX 通道触发的事件...
步骤分解
- 构建等待条件列表
conds
:- 数据条件:由
get_wait_cond
返回,如等待起始位下降沿或数据采样点。 - 边沿条件:监听通道的任何边沿事件(
'e'
)。 - 空闲条件:由
get_idle_cond
返回,如持续高电平超时。
- 数据条件:由
- 等待事件触发:
self.wait(conds)
阻塞直到任一条件满足,返回当前 RX/TX 信号值。
- 处理触发事件:
- 数据条件:调用
inspect_sample
解析数据位。 - 边沿条件:调用
inspect_edge
检测 BREAK 条件,并检查空闲状态。 - 空闲条件:直接调用
inspect_idle
。
- 数据条件:调用
方法 | 作用 |
---|---|
get_wait_cond(rxtx, inv) | 返回当前状态下的等待条件(如边沿或采样点跳转)。 |
get_idle_cond(rxtx, inv) | 返回检测空闲状态的条件(如持续高电平超过阈值)。 |
inspect_sample | 解析数据位、校验位或停止位。 |
inspect_edge | 检测 BREAK 条件(长低电平)。 |
inspect_idle | 处理空闲状态(持续高电平)。 |