[GN] Uart协议解码器源码各个方法

系列文章目录

sigrokdecode 模块学习指南 — 准备阶段
通讯协议 - Uart
sigrokdecode 模块
UART协议解码器源码解析
Uart协议解码器源码各个方法


`

文章目录

  • 系列文章目录
  • 引入库
  • parity_ok
  • 注解类型枚举
  • options参数
  • annotations 注解
  • annotation_rows 注解分组
    • 接收(RX)相关行
    • 发送(TX)相关行
  • put 输出方法
    • putx(self, rxtx, data)
    • putx_packet(self, rxtx, data)
    • putpx(self, rxtx, data)
    • putg(self, data)
    • putp(self, data)
    • putgse(self, ss, es, data)
    • 工作流程示例
  • reset
    • 示例场景
  • start
    • Python对象通道
        • 定义与用途
        • 数据形式
    • 逻辑分析仪注释通道
        • **定义与用途**
        • **数据形式**
    • 示例场景
  • wait_for_start_bit
  • get_start_bit
    • 记录起始位信号
    • 验证起始位有效性
  • handle_packet
    • 确定配置参数
    • 缓存数据并记录起始样本点
    • 判断数据包结束条件
  • get_data_bits
  • format_value
  • get_parity_bit
    • 保存校验位值并更新位计数器
    • 校验位验证
    • 状态转移
  • get_stop_bits
    • 记录停止位值并更新位计数器
    • 验证停止位合法性
    • 输出停止位信息
    • 检查停止位数量
  • get_wait_cond
    • 取当前状态
    • 处理「等待起始位」状态
    • 处理「数据/校验/停止位」状态
  • get_idle_cond
  • inspect_sample
    • 信号反转处理
    • 状态路由与解析
    • 执行流程示例
  • inspect_edge
  • decode
    • 初始校验
    • 信号反转配置
    • 帧参数计算
    • 主事件循环
    • 步骤分解


'''
输出格式说明(OUTPUT_PYTHON):
每个数据包格式为 [<ptype>, <rxtx>, <pdata>]
<ptype> 类型包括:- 'STARTBIT': 起始位值(0/1)- 'DATA': 数据值和各比特位的起止样本号- 'PARITYBIT': 校验位值- 'STOPBIT': 停止位值- 'INVALID ...': 无效位错误- 'PARITY ERROR': 校验错误- 'BREAK': 线路中断- 'FRAME': 完整帧数据及有效性- 'IDLE': 空闲状态
<rxtx> 表示方向:0(RX接收)或1(TX发送)
'''
''' DATA = (13, [(1, ss=200, es=250), (0, ss=150, es=200), (1, ss=100, es=150), (1, ss=50, es=100)])
LSB到MSB顺序:传输顺序为 1 (LSB) → 0 → 1 → 1 (MSB)。'''

引入库

# 引入必要的库
import sigrokdecode as srd  # Sigrok 解码器基础库
from common.srdhelper import bitpack  # 辅助函数:将位列表转为整数值
from math import floor, ceil  # 数学函数

parity_ok

# 校验位检查函数
def parity_ok(parity_type, parity_bit, data, data_bits):#  忽略校验:直接返回Trueif parity_type == 'ignore': return True# 根据校验类型判断校验位是否正确if parity_type == 'zero': return parity_bit == 0elif parity_type == 'one': return parity_bit == 1# 计算1的个数(数据+校验位)ones = bin(data).count('1') + parity_bit  # 奇校验:1的总数为奇数if parity_type == 'odd': return (ones % 2) == 1elif parity_type == 'even': return (ones % 2) == 0
参数含义取值范围作用
parity_type校验类型ignore/zero/one/odd/even决定校验规则
parity_bit接收到的校验位值01需根据校验类型验证合法性
data数据整数值data_bits决定(如0~255)提供原始数据用于计算1的数量
data_bits数据位长度5~9确保正确解析数据的二进制位数
  • 函数用于验证 UART 协议中的奇偶校验位是否正确。
  1. 忽略校验:直接返回 True
  2. 固定校验值:强制校验位为 01
  3. 奇偶校验:统计数据位和校验位中 1 的总数

注解类型枚举

# 自定义异常类
class SamplerateError(Exception): pass  # 采样率未设置错误
class ChannelError(Exception): pass     # 通道未配置错误# 注解类型枚举(用于在GUI中高亮显示UART帧的不同部分(如起始位、数据位)。)
class Ann:RX_DATA, TX_DATA, RX_START, TX_START, RX_PARITY_OK, TX_PARITY_OK, \RX_PARITY_ERR, TX_PARITY_ERR, RX_STOP, TX_STOP, RX_WARN, TX_WARN, \RX_DATA_BIT, TX_DATA_BIT, RX_BREAK, TX_BREAK, RX_PACKET, TX_PACKET = range(18)# 二进制输出类型 原始数据的转储,便于后续分析
class Bin:RX, TX, RXTX = range(3)  # 分别对应接收、发送、双向数据
  • range(3):生成一个整数序列 [0, 1, 2]
RX_DATA0接收端完整数据字节标注接收到的完整数据字节(如 0x55
TX_DATA1发送端完整数据字节标注发送的完整数据字节
RX_START2接收起始位(Start Bit)标注接收通道的起始位(低电平信号)
TX_START3发送起始位标注发送通道的起始位
RX_PARITY_OK4接收校验正确校验位验证通过时的标注(如偶校验正确)
TX_PARITY_OK5发送校验正确发送端校验正确
RX_PARITY_ERR6接收校验错误校验位验证失败时的错误标注
TX_PARITY_ERR7发送校验错误发送端校验错误
RX_STOP8接收停止位(Stop Bit)标注接收通道的停止位(高电平信号)
TX_STOP9发送停止位标注发送通道的停止位
RX_WARN10接收警告接收异常(如停止位电平错误)
TX_WARN11发送警告发送端异常
RX_DATA_BIT12接收的单个数据位标注接收的每个二进制位(如 01
TX_DATA_BIT13发送的单个数据位标注发送的每个二进制位
RX_BREAK14接收线路中断(BREAK 状态)检测到线路长时间低电平(BREAK 信号)
TX_BREAK15发送线路中断发送端 BREAK 信号
RX_PACKET16接收完整数据包标注多字节组成的完整数据包(如 [0xAA, 0x55]
TX_PACKET17发送完整数据包标注发送的完整数据包

options参数

 # 用户可配置参数options = ({'id': 'baudrate', 'desc': '波特率', 'default': 115200},{'id': 'data_bits', 'desc': '数据位', 'default': 8, 'values': (5, 6, 7, 8, 9)},{'id': 'parity', 'desc': '校验类型', 'default': 'none', 'values': ('none', 'odd', 'even', 'zero', 'one', 'ignore')},{'id': 'stop_bits', 'desc': '停止位', 'default': 1.0, 'values': (0.0, 0.5, 1.0, 1.5, 2.0)},{'id': 'bit_order', 'desc': '位顺序', 'default': 'lsb-first', 'values': ('lsb-first', 'msb-first')},{'id': 'format', 'desc': '数据格式', 'default': 'hex', 'values': ('ascii', 'dec', 'hex', 'oct', 'bin')},# ... 其他选项(省略) ...)
参数ID (id)描述 (desc)默认值 (default)可选值 (values)说明
baudrate波特率115200-每秒传输的符号数(常用值如9600、115200)。
data_bits数据位8(5, 6, 7, 8, 9)每个数据帧的位数(通常为8位)。
parity校验类型‘none’(‘none’, ‘odd’, ‘even’, ‘zero’, ‘one’, ‘ignore’)校验位规则(见奇偶校验详解)。
stop_bits停止位1.0(0.0, 0.5, 1.0, 1.5, 2.0)停止位的持续时间(通常为1或2位时间)。
bit_order位传输顺序‘lsb-first’(‘lsb-first’, ‘msb-first’)LSB(最低有效位)先传输是UART标准。
format数据格式‘hex’(‘ascii’, ‘dec’, ‘hex’, ‘oct’, ‘bin’)数据以ASCII、十进制、十六进制、八进制或二进制格式显示。
invert_rx反转RX信号电平‘no’(‘yes’, ‘no’)若为’yes’,接收信号电平逻辑取反(高→低,低→高)。
invert_tx反转TX信号电平‘no’(‘yes’, ‘no’)若为’yes’,发送信号电平逻辑取反。
sample_point采样点位置(百分比)50-在位的中间位置(50%)采样以提高抗噪性。
rx_packet_delimRX数据包分隔符(十进制)-1-分隔符的ASCII值(如-1表示无分隔符,10表示换行符)。
tx_packet_delimTX数据包分隔符(十进制)-1-同RX分隔符,用于发送。
rx_packet_lenRX数据包长度-1-固定长度数据包(如-1表示可变长度)。
tx_packet_lenTX数据包长度-1-同RX数据包长度。

annotations 注解

# 注解定义(界面显示的文本标签)annotations = (('rx-data', '接收数据'), ('tx-data', '发送数据'),('rx-start', '接收起始位'), ('tx-start', '发送起始位'),# ... 其他注解(省略) ...)
标识符 (ID)描述 (Description)含义与应用场景
rx-dataRX data接收数据:标识接收到的数据位部分,通常对应数据帧的有效信息。
tx-dataTX data发送数据:标识发送的数据位部分,标记正在传输的有效信息。
rx-startRX start bit接收起始位:标识接收到的起始位(低电平),用于同步数据接收。
tx-startTX start bit发送起始位:标识发送的起始位(低电平),标志数据帧开始传输。
rx-parity-okRX parity OK bit接收校验成功:校验位验证通过(奇偶校验正确),数据无错误。
tx-parity-okTX parity OK bit发送校验成功:发送端计算的校验位符合规则,数据已正确添加校验。
rx-parity-errRX parity error bit接收校验失败:校验位验证失败,数据可能存在传输错误。
tx-parity-errTX parity error bit发送校验失败:发送端校验计算异常(通常为配置错误或逻辑问题)。
rx-stopRX stop bit接收停止位:标识接收到的停止位(高电平),标志数据帧结束。
tx-stopTX stop bit发送停止位:标识发送的停止位(高电平),确保数据帧结束。
rx-warningRX warning接收警告:指示接收过程中的非致命异常(如波特率轻微偏差、噪声干扰)。
tx-warningTX warning发送警告:指示发送过程中的潜在问题(如缓冲区溢出、发送超时)。
rx-data-bitRX data bit接收数据位:细分到单个数据位的接收状态,用于位级调试。
tx-data-bitTX data bit发送数据位:细分到单个数据位的发送状态,用于位级分析。
rx-breakRX break接收中断:检测到线路持续低电平超过一帧时间(BREAK条件),可能表示通信终止或复位信号。
tx-breakTX break发送中断:主动发送BREAK信号,用于通知接收端通信中断。
rx-packetRX packet接收数据包:标识完整接收到的数据包(包含起始位、数据位、校验位、停止位)。
tx-packetTX packet发送数据包:标识完整发送的数据包,用于跟踪完整帧的传输过程。

annotation_rows 注解分组

# 注解分组(在界面中归类显示)annotation_rows = (('rx-data-bits', '接收位', (Ann.RX_DATA_BIT,)),('rx-data-vals', '接收数据', (Ann.RX_DATA, Ann.RX_START, Ann.RX_PARITY_OK, Ann.RX_PARITY_ERR, Ann.RX_STOP)),# ... 其他分组(省略) ...)
行分类核心作用典型使用场景
数据位行位级调试(时序、电平)硬件信号完整性分析
数据帧行完整帧解析(起始、数据、校验、停止)协议合规性验证
警告/错误行异常检测与诊断通信稳定性测试
BREAK行中断信号处理设备复位或通信终止检测
数据包行聚合帧数据展示应用层数据处理(如ASCII消息解析)

接收(RX)相关行

行ID行描述包含的注解常量作用
rx-data-bitsRX bitsAnn.RX_DATA_BIT显示每个接收数据位的详细信息(如位值、时序)。
rx-data-valsRX dataAnn.RX_DATA, Ann.RX_START, Ann.RX_PARITY_OK, Ann.RX_PARITY_ERR, Ann.RX_STOP标记完整接收帧(起始位、数据、校验、停止位)。
rx-warningsRX warningsAnn.RX_WARN接收过程中的警告(如波特率偏差、噪声干扰)。
rx-breaksRX breaksAnn.RX_BREAK接收到的BREAK信号(持续低电平中断)。
rx-packetsRX packetsAnn.RX_PACKET完整接收的数据包(包含所有位的聚合信息)。

发送(TX)相关行

行ID行描述包含的注解常量作用
tx-data-bitsTX bitsAnn.TX_DATA_BIT显示每个发送数据位的详细信息。
tx-data-valsTX dataAnn.TX_DATA, Ann.TX_START, Ann.TX_PARITY_OK, Ann.TX_PARITY_ERR, Ann.TX_STOP标记完整发送帧(起始位、数据、校验、停止位)。
tx-warningsTX warningsAnn.TX_WARN发送过程中的警告(如缓冲区溢出、发送超时)。
tx-breaksTX breaksAnn.TX_BREAK主动发送的BREAK信号。
tx-packetsTX packetsAnn.TX_PACKET完整发送的数据包。

put 输出方法

	def putx(self, rxtx, data):s, halfbit = self.startsample[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)def putx_packet(self, rxtx, data):s, halfbit = self.ss_packet[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)def putpx(self, rxtx, data):s, halfbit = self.startsample[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_python, data)def putg(self, data):s, halfbit = self.samplenum, self.bit_width / 2.0self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data)def putp(self, data):s, halfbit = self.samplenum, self.bit_width / 2.0self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data)def putgse(self, ss, es, data):self.put(ss, es, self.out_ann, data)
方法名核心用途输出通道时间范围计算方式
putx标注方向性单一位事件逻辑分析仪注释基于通道起始采样点扩展半位宽
putx_packet标注完整数据包逻辑分析仪注释基于数据包起始采样点扩展半位宽
putpx输出方向性结构化数据Python对象putx
putg标注当前瞬时事件逻辑分析仪注释以当前采样点为中心扩展半位宽
putp输出当前瞬时结构化数据Python对象putg
putgse自定义时间范围标注逻辑分析仪注释直接指定起止采样点

putx(self, rxtx, data)

  • 功能:标注单个数据位或控制位(如起始位、停止位)的时间范围,基于通道(RX/TX)的起始采样点。
  • 参数
    • rxtx:方向标识,取值为 Bin.RX(接收)或 Bin.TX(发送)。
    • data:要输出的注解数据(如 (Ann.RX_DATA_BIT, 'Bit: 1'))。
  • 时间计算
    • 起始时间startsample[rxtx] - floor(halfbit)
      (从通道的起始采样点向前扩展半位宽)
    • 结束时间self.samplenum + ceil(halfbit)
      (当前采样点向后扩展半位宽)
  • 输出目标self.out_ann(逻辑分析仪注释通道)。
  • 应用场景
    标记接收或发送的单个数据位,确保标注覆盖整个位的持续时间。
    示例:在解析到第3个数据位时,调用 putx(Bin.RX, (Ann.RX_DATA_BIT, 'Bit: 1'))

putx_packet(self, rxtx, data)

  • 功能:标注完整数据包(如整个UART帧)的时间范围,基于数据包的起始采样点。
  • 参数
    • rxtx:方向标识(同 putx)。
    • data:数据包相关注解(如 (Ann.RX_PACKET, 'Packet: 0x41'))。
  • 时间计算
    • 起始时间ss_packet[rxtx] - floor(halfbit)
      (数据包的起始采样点向前扩展半位宽)
    • 结束时间self.samplenum + ceil(halfbit)
      (当前采样点向后扩展半位宽)
  • 输出目标self.out_ann
  • 应用场景
    标记完整的接收或发送数据包(包含起始位、数据位、校验位、停止位)。
    示例:完成一帧解析后,调用 putx_packet(Bin.TX, (Ann.TX_PACKET, 'Data: A'))

putpx(self, rxtx, data)

  • 功能:与 putx 类似,但输出到Python对象通道,用于生成结构化数据。
  • 参数:同 putx
  • 输出目标self.out_python(Python对象输出通道)。
  • 应用场景
    将解码后的原始数据(如字节值、校验结果)传递给其他Python模块处理,而非可视化展示。
    示例:发送数据时,调用 putpx(Bin.TX, {'type': 'data', 'value': 0x41})

putg(self, data)

  • 功能:以当前采样点为中心,标注单个事件(如警告、校验结果)的时间范围
  • 参数
    • data:注解数据(如 (Ann.RX_PARITY_ERR, 'Parity Error'))。
  • 时间计算
    • 起始时间self.samplenum - floor(halfbit)
      (当前采样点向前扩展半位宽)
    • 结束时间self.samplenum + ceil(halfbit)
      (当前采样点向后扩展半位宽)
  • 输出目标self.out_ann
  • 应用场景
    标记瞬时事件(如校验错误、警告),确保标注在波形中居中显示。
    示例:检测到校验错误时,调用 putg((Ann.RX_PARITY_ERR, 'Parity Error'))

putp(self, data)

  • 功能:类似 putg,但输出到Python对象通道。
  • 参数:同 putg
  • 输出目标self.out_python
  • 应用场景
    将瞬时事件的结构化数据传递给其他处理逻辑。
    示例:发送BREAK信号时,调用 putp({'event': 'break', 'direction': 'rx'})

putgse(self, ss, es, data)

  • 功能:自定义时间范围标注,直接指定起始和结束采样点。

  • 参数

    • ss:起始采样点(整数)。
    • es:结束采样点(整数)。
    • data:注解数据。
  • 输出目标self.out_ann

  • 应用场景
    处理非标准时间范围的事件(如跨越多个位的BREAK信号)。
    示例:标注一个持续低电平的BREAK信号:

    self.putgse(ss=100, es=200, data=(Ann.RX_BREAK, 'BREAK Detected'))
    

工作流程示例

  1. 检测起始位:当RX线路变为低电平时,记录 startsample[Bin.RX] = 当前采样点
  2. 解析数据位:逐个采样点读取数据位,调用 putx(Bin.RX, ...) 标注每个位。
  3. 校验与停止位:校验完成后,调用 putg(...) 标记校验结果。
  4. 完成数据包:调用 putx_packet(Bin.RX, ...) 输出完整帧信息。
  5. 输出到Python:同时调用 putpxputp 传递结构化数据。

reset

def reset(self):self.samplerate = Noneself.frame_start = [-1, -1]self.frame_valid = [None, None]self.cur_frame_bit = [None, None]self.startbit = [-1, -1]self.cur_data_bit = [0, 0]self.datavalue = [0, 0]self.paritybit = [-1, -1]self.stopbits = [[], []]self.startsample = [-1, -1]self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']self.databits = [[], []]self.break_start = [None, None]self.packet_cache = [[], []]self.ss_packet, self.es_packet = [None, None], [None, None]self.idle_start = [None, None]
变量初始值用途初始值设计逻辑
samplerateNone存储采样率(单位:Hz)未配置前未知,需外部设置。
frame_start[-1, -1]接收(RX)和发送(TX)方向帧的起始样本点。-1 表示未检测到帧起始。
frame_valid[None, None]标记当前帧是否有效(校验通过)。None 表示帧未解析完成或未验证。
cur_frame_bit[None, None]当前处理的帧中的位数(如起始位=0,数据位=1~N)。None 表示未开始解析帧。
startbit[-1, -1]起始位的样本点位置。-1 表示未检测到起始位。
cur_data_bit[0, 0]当前处理的数据位索引(从0开始,对应LSB)。数据位从第0位开始解析。
datavalue[0, 0]当前帧的整数值(通过数据位计算)。初始值为0,解析时按位累加。
paritybit[-1, -1]校验位的值(0或1)。-1 表示未解析到校验位。
stopbits[[], []]停止位的样本点范围列表(可能包含多个停止位)。空列表表示未检测到停止位。
startsample[-1, -1]帧的起始样本点(含起始位)。-1 表示帧未开始。
state['WAIT FOR START BIT', 'WAIT FOR START BIT']解析器的状态机状态(RX和TX方向)。空闲时持续检测起始位,符合UART协议逻辑。
databits[[], []]解析出的数据位列表(如 [1, 0, 1])。空列表表示未解析到数据位。
break_start[None, None]BREAK信号(持续低电平)的起始样本点。None 表示未检测到BREAK。
packet_cache[[], []]临时存储数据包内容(如多帧聚合)。空列表表示无缓存数据。
ss_packet, es_packet[None, None], [None, None]数据包的起始和结束样本点。None 表示数据包未开始或未完成。
idle_start[None, None]进入空闲状态的起始样本点(用于超时检测)。None 表示当前未处于空闲状态。

示例场景

接收一帧8位数据(值 0x41,校验通过)

  1. 起始位startbit[0] = 100startsample[0] = 100state[0] = 'PROCESSING DATA BITS'
  2. 数据位:逐位解析,cur_data_bit[0] 从0递增到7,databits[0] = [1, 0, 0, 0, 0, 0, 1, 0]datavalue[0] = 0x41
  3. 校验位paritybit[0] = 1,校验计算通过,frame_valid[0] = True
  4. 停止位stopbits[0] = [180, 200]
  5. 数据包记录ss_packet[0] = 100es_packet[0] = 200packet_cache[0].append(0x41)

start

 def start(self):self.out_python = self.register(srd.OUTPUT_PYTHON)self.out_binary = self.register(srd.OUTPUT_BINARY)self.out_ann = self.register(srd.OUTPUT_ANN)self.bw = (self.options['data_bits'] + 7) // 8
  • 作用:向协议解析框架(如 sigrok)注册三类输出通道,用于分发解析结果。

  • 作用:根据配置的 数据位长度data_bits),计算存储数据所需的 最小字节数

Python对象通道

定义与用途
  • 作用:将协议解析后的结构化数据(如完整数据包、校验结果、原始字节)传递给其他Python模块,用于自动化处理、存储或分析
  • 目标用户:软件开发工程师、自动化测试脚本,用于程序化数据消费。
数据形式
  • 数据结构:通常为 字典(Dict)自定义类实例,包含机器可读的字段:

    • 事件类型:如 'data''error''packet'
    • 数据内容:如字节值、ASCII字符串、十六进制数值。
    • 元数据:时间戳、方向(RX/TX)、校验状态等。
  • 示例

    # 发送一个解析后的数据包到Python对象通道
    packet_data = {'direction': 'rx',          # 数据方向'timestamp': 1630450000.5,  # 时间戳'data': b'\x41',            # 原始字节(ASCII 'A')'parity_ok': True,          # 校验结果'length': 8,                # 数据位长度
    }
    self.put(output=self.out_python, data=packet_data)
    

逻辑分析仪注释通道

定义与用途
  • 作用:将协议解析后的关键事件(如数据位、起始位、校验结果等)以可视化注释的形式标注在逻辑分析仪的波形界面上,帮助用户直观理解信号含义。
  • 目标用户:硬件工程师、调试人员,用于实时或离线分析信号时序和协议合规性。
数据形式
  • 数据结构:通常为元组(Tuple)特定对象,包含以下信息:

    • 时间范围:起始和结束的采样点(或时间戳),确定注释在波形上的显示范围。
    • 注释类型:预定义的常量(如 Ann.RX_DATA 表示接收数据),用于分类事件。
    • 描述信息:可读的文本或数值(如数据位的值 'Bit: 1')。
  • 示例

    # 标注一个接收数据位(值1)的时间范围和类型
    annotation = (Ann.RX_DATA_BIT,  # 注释类型(常量)'Bit: 1',         # 描述信息
    )
    self.put(ss=100, es=150, output=self.out_ann, data=annotation)
    
    • 在波形界面中,从样本点100到150的区间内显示“Bit: 1”。
 def metadata(self, key, value):if key == srd.SRD_CONF_SAMPLERATE:self.samplerate = value# The width of one UART bit in number of samples.self.bit_width = float(self.samplerate) / float(self.options['baudrate'])
  1. self.samplerate

    • 作用:存储当前采样率(单位:Hz),用于后续时间与样本点转换。
    • 示例samplerate = 1_000_000 表示每秒采集 100 万个样本。
  2. self.bit_width

    • 作用:表示每个 UART 位占用的样本数,直接影响解析精度和时序计算。

    • 计算逻辑

      • 采样率(samplerate)决定时间分辨率。
      • 波特率(baudrate)决定每秒传输的位数。
      • 二者的比值即为每个位的样本数。
    • 示例

      • samplerate=12 MHzbaudrate=115200,则:

        bit_width=12 000 000/115 200≈104.17 个样本/位

 def get_sample_point(self, rxtx, bitnum):# Determine absolute sample number of a bit slot's sample point.# Counts for UART bits start from 0 (0 = start bit, 1..x = data,# x+1 = parity bit (if used) or the first stop bit, and so on).# Accept a position in the range of 1-99% of the full bit width.# Assume 50% for invalid input specs for backwards compatibility.perc = self.options['sample_point'] or 50if not perc or perc not in range(1, 100):perc = 50perc /= 100.0bitpos = (self.bit_width - 1) * percbitpos += self.frame_start[rxtx]bitpos += bitnum * self.bit_widthreturn bitpos
  • 目的:根据配置的采样点百分比,确定某一位(如起始位、数据位、校验位)的采样位置(绝对样本数)。
rxtx枚举通信方向(RXTX),用于获取对应方向的帧起始样本点。
bitnum整数位编号(0 = 起始位,1~N = 数据位,N+1 = 校验位/停止位)。
  1. 获取采样点百分比

    perc = self.options['sample_point'] or 50
    if not perc or perc not in range(1, 100):perc = 50
    perc /= 100.0
    
    • 默认值处理:若 sample_point 无效(非 1-99),默认使用 50%。
    • 百分比转换:将百分比转换为小数(如 50% → 0.5)。
  2. 计算位内采样偏移

    bitpos = (self.bit_width - 1) * perc
    
    • 位宽调整self.bit_width - 1 确保采样点在位的时间范围内(如位宽为 10 样本时,采样点范围是 0~9)。
  3. 确定绝对位置

    bitpos += self.frame_start[rxtx]
    bitpos += bitnum * self.bit_width
    
    • 帧起始偏移:从帧的起始样本点开始。
    • 位编号偏移:累加前序位的总样本数(如 bitnum=2 表示跨越 2 个完整位)。

示例场景

配置参数

  • 波特率 115200,采样率 1,000,000 Hz → 位宽 8.68 样本/位。
  • 采样点 70%,方向 RX,帧起始样本点 1000,位编号 1(第 1 个数据位)。

计算过程

  1. 位内偏移

    (8.68−1)×0.7≈5.376(8.68−1)×0.7≈5.376

  2. 绝对位置

    1000+5.376+1×8.68≈1014.0561000+5.376+1×8.68≈1014.056

  3. 最终采样点:约 1014(取整)。

wait_for_start_bit

  def wait_for_start_bit(self, rxtx, signal):# Save the sample number where the start bit begins.self.frame_start[rxtx] = self.samplenumself.frame_valid[rxtx] = Trueself.cur_frame_bit[rxtx] = 0
  • 核心作用:当检测到起始位(低电平信号)时,初始化帧解析所需的关键参数,并标记帧开始。
  • 触发条件:在 UART 空闲状态(高电平)中检测到信号跳变为低电平。
变量类型说明
frame_start列表(int)存储 RX/TX 方向帧的起始样本点(如 [1000, -1] 表示 RX 帧从样本 1000 开始)。
frame_valid列表(bool)标记帧是否有效(如校验通过则为 True,否则为 False)。
cur_frame_bit列表(int)当前处理的位索引(0 = 起始位,1~N = 数据位,N+1 = 校验位/停止位)。
samplenum整数当前解析器处理的样本点编号(由框架自动更新)。
  • 记录起始位样本点
    • self.frame_start[rxtx] = self.samplenum
      将当前采样点(self.samplenum)保存到对应方向(RX/TX)的 frame_start 数组中,用于后续计算数据位、校验位和停止位的时间范围。
  • 标记帧有效性
    • self.frame_valid[rxtx] = True
      假设帧在起始阶段有效,后续可能根据校验结果(如奇偶校验、停止位验证)更新此标记。
  • 初始化位计数器
    • self.cur_frame_bit[rxtx] = 0
      0 开始计数,表示当前处理的是起始位。数据位通常从索引 1 开始递增。

get_start_bit

    def get_start_bit(self, rxtx, signal):self.startbit[rxtx] = signalself.cur_frame_bit[rxtx] += 1# The startbit must be 0. If not, we report an error and wait# for the next start bit (assuming this one was spurious).if self.startbit[rxtx] != 0:self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = Falsees = self.samplenum + ceil(self.bit_width / 2.0)self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])self.advance_state(rxtx, signal, fatal = True, idle = es)return# Reset internal state for the pending UART frame.self.cur_data_bit[rxtx] = 0self.datavalue[rxtx] = 0self.paritybit[rxtx] = -1self.stopbits[rxtx].clear()self.startsample[rxtx] = -1self.databits[rxtx].clear()self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']])self.advance_state(rxtx, signal)
startbit[rxtx]存储起始位电平值(应为 0)。
cur_frame_bit[rxtx]跟踪当前处理的帧位索引(起始位为 0,数据位从 1 开始)。
frame_valid[rxtx]标记帧有效性,初始为 True,错误时置 False
putp / putg输出日志和注释,分别对应 Python 对象通道和逻辑分析仪注释通道。
advance_state状态机驱动方法,控制解析流程(如跳转到数据位或空闲状态)。

记录起始位信号

self.startbit[rxtx] = signal
self.cur_frame_bit[rxtx] += 1
  • 目的
    • 保存当前信号值到 startbit 数组,标记起始位的电平状态。
    • 递增当前帧的位计数器(cur_frame_bit),表示开始处理起始位。

验证起始位有效性

if self.startbit[rxtx] != 0:# 错误处理:输出日志和警告注释self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])# 标记帧无效self.frame_valid[rxtx] = False# 计算错误帧的结束样本点es = self.samplenum + ceil(self.bit_width / 2.0)# 记录错误帧信息self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])# 推进状态机到空闲状态self.advance_state(rxtx, signal, fatal=True, idle=es)return
  • 核心逻辑:python
    • 若起始位非低电平(signal != 0),判定为无效起始位(可能由噪声引起)。
    • 错误处理步骤
      1. 输出日志:通过 putp 记录错误类型(INVALID STARTBIT)、方向和信号值。
      2. 可视化警告:通过 putg 在逻辑分析仪界面生成错误注释(如 Frame error)。
      3. 标记帧无效frame_valid[rxtx] = False,后续流程将忽略此帧。
      4. 记录错误帧范围:通过 putpse 标注错误帧的起止样本点,供调试参考。
      5. 状态转移:调用 advance_state 强制进入空闲状态(idle=es),等待下一帧起始位。

##有效起始位处理**

# 重置内部状态
self.cur_data_bit[rxtx] = 0
self.datavalue[rxtx] = 0
self.paritybit[rxtx] = -1
self.stopbits[rxtx].clear()
self.startsample[rxtx] = -1
self.databits[rxtx].clear()# 输出起始位信息
self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])
self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']])# 推进状态机到下一状态
self.advance_state(rxtx, signal)
  • 目的
    • 状态初始化:清空或重置与帧解析相关的变量,确保无残留数据干扰新帧。
    • 日志与注释:记录有效起始位事件,支持后续调试和可视化。
    • 状态转移:调用 advance_state 进入数据位解析阶段。

handle_packet

    def handle_packet(self, rxtx):d = 'rx' if (rxtx == RX) else 'tx'delim = self.options[d + '_packet_delim']plen = self.options[d + '_packet_len']if delim == -1 and plen == -1:return# Cache data values until we see the delimiter and/or the specified# packet length has been reached (whichever happens first).if len(self.packet_cache[rxtx]) == 0:self.ss_packet[rxtx] = self.startsample[rxtx]self.packet_cache[rxtx].append(self.datavalue[rxtx])if self.datavalue[rxtx] == delim or len(self.packet_cache[rxtx]) == plen:self.es_packet[rxtx] = self.samplenums = ''for b in self.packet_cache[rxtx]:s += self.format_value(b)if self.options['format'] != 'ascii':s += ' 'if self.options['format'] != 'ascii' and s[-1] == ' ':s = s[:-1] # Drop trailing space.self.putx_packet(rxtx, [Ann.RX_PACKET + rxtx, [s]])self.packet_cache[rxtx] = []
  • 根据配置的分隔符或长度条件判断是否结束当前数据包的解析,并在满足条件时触发数据包的最终处理。
packet_cache[rxtx]临时存储数据包内容(如 [0x41, 0x42])。
ss_packet[rxtx]数据包起始样本点(第一个数据位的开始时间)。
es_packet[rxtx]数据包结束样本点(触发结束条件的当前样本点)。
rx_packet_delim接收方向的分隔符(如 10 表示换行符)。
tx_packet_len发送方向的固定数据包长度(如 64 字节)。
format_value(b)根据 format 配置(ASCII/HEX等)将字节转换为字符串。

确定配置参数

d = 'rx' if (rxtx == RX) else 'tx'  # 生成配置键前缀(如 "rx_" 或 "tx_")
delim = self.options[d + '_packet_delim']  # 分隔符(十进制ASCII值)
plen = self.options[d + '_packet_len']     # 数据包固定长度
if delim == -1 and plen == -1:return  # 未配置分隔符和长度,无需处理
  • 分隔符:若配置为 -1,表示不依赖分隔符结束数据包。
  • 固定长度:若配置为 -1,表示不依赖固定长度。
  • 检查有效性:若两者均为 -1,直接返回,数据包由其他逻辑(如停止位)触发结束。

缓存数据并记录起始样本点

if len(self.packet_cache[rxtx]) == 0:self.ss_packet[rxtx] = self.startsample[rxtx]  # 记录数据包起始样本点
self.packet_cache[rxtx].append(self.datavalue[rxtx])  # 缓存当前数据值
  • 首次缓存:若缓存为空,记录数据包的起始样本点(即第一个数据位的起始位置)。
  • 逐帧累积:将当前解析出的数据值(如 0x41)存入缓存列表。

判断数据包结束条件

if self.datavalue[rxtx] == delim or len(self.packet_cache[rxtx]) == plen:# 满足结束条件(分隔符或长度)self.es_packet[rxtx] = self.samplenum  # 记录数据包结束样本点# 格式化数据包内容s = ''for b in self.packet_cache[rxtx]:s += self.format_value(b)  # 根据配置(ASCII/HEX等)格式化值if self.options['format'] != 'ascii':s += ' '  # 非ASCII格式添加分隔符(如十六进制值之间加空格)if self.options['format'] != 'ascii' and s[-1] == ' ':s = s[:-1]  # 移除末尾多余空格# 输出数据包注释self.putx_packet(rxtx, [Ann.RX_PACKET + rxtx, [s]])self.packet_cache[rxtx] = []  # 清空缓存
  • 结束条件
    • 分隔符触发:当前数据值等于配置的分隔符(如 0x0A 表示换行符 \n)。
    • 长度触发:缓存数据长度达到配置的固定值(如 64 字节)。
  • 数据格式化
    • ASCII:直接拼接字符(如 b'\x41\x42'"AB")。
    • 非ASCII(HEX/DEC/OCT/BIN):用空格分隔值(如 "41 42")。
  • 输出与清理
    • 调用 putx_packet 输出数据包注释(如 Ann.RX_PACKET)。
    • 清空缓存,准备接收下一数据包。

get_data_bits

def get_data_bits(self, rxtx, signal):# Save the sample number of the middle of the first data bit.if self.startsample[rxtx] == -1:self.startsample[rxtx] = self.samplenumself.putg([Ann.RX_DATA_BIT + rxtx, ['%d' % signal]])# Store individual data bits and their start/end samplenumbers.s, halfbit = self.samplenum, int(self.bit_width / 2)self.databits[rxtx].append([signal, s - halfbit, s + halfbit])self.cur_frame_bit[rxtx] += 1# Return here, unless we already received all data bits.self.cur_data_bit[rxtx] += 1if self.cur_data_bit[rxtx] < self.options['data_bits']:return# Convert accumulated data bits to a data value.bits = [b[0] for b in self.databits[rxtx]]if self.options['bit_order'] == 'msb-first':bits.reverse()self.datavalue[rxtx] = bitpack(bits)self.putpx(rxtx, ['DATA', rxtx,(self.datavalue[rxtx], self.databits[rxtx])])b = self.datavalue[rxtx]formatted = self.format_value(b)if formatted is not None:self.putx(rxtx, [rxtx, [formatted]])bdata = b.to_bytes(self.bw, byteorder='big')self.putbin(rxtx, [Bin.RX + rxtx, bdata])self.putbin(rxtx, [Bin.RXTX, bdata])self.handle_packet(rxtx)self.databits[rxtx] = []self.advance_state(rxtx, signal)

format_value

    def format_value(self, v):# Format value 'v' according to configured options.# Reflects the user selected kind of representation, as well as# the number of data bits in the UART frames.fmt, bits = self.options['format'], self.options['data_bits']# Assume "is printable" for values from 32 to including 126,# below 32 is "control" and thus not printable, above 127 is# "not ASCII" in its strict sense, 127 (DEL) is not printable,# fall back to hex representation for non-printables.if fmt == 'ascii':if v in range(32, 126 + 1):return chr(v)hexfmt = "[{:02X}]" if bits <= 8 else "[{:03X}]"return hexfmt.format(v)# Mere number to text conversion without prefix and padding# for the "decimal" output format.if fmt == 'dec':return "{:d}".format(v)# Padding with leading zeroes for hex/oct/bin formats, but# without a prefix for density -- since the format is user# specified, there is no ambiguity.if fmt == 'hex':digits = (bits + 4 - 1) // 4fmtchar = "X"elif fmt == 'oct':digits = (bits + 3 - 1) // 3fmtchar = "o"elif fmt == 'bin':digits = bitsfmtchar = "b"else:fmtchar = Noneif fmtchar is not None:fmt = "{{:0{:d}{:s}}}".format(digits, fmtchar)return fmt.format(v)return None
  • 逐位接收数据,将其累积并转换为完整的数据值,同时生成可视化注释和结构化输出。其核心作用包括:
    1. 记录数据位的时序信息(起始/结束样本点)。
    2. 按配置的位顺序组合数据位(LSB-first 或 MSB-first)。
    3. 格式化数据值(如十六进制、ASCII)。
    4. 触发数据包处理逻辑(当数据位接收完成时)。

get_parity_bit

def get_parity_bit(self, rxtx, signal):self.paritybit[rxtx] = signalself.cur_frame_bit[rxtx] += 1if parity_ok(self.options['parity'], self.paritybit[rxtx],self.datavalue[rxtx], self.options['data_bits']):self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])self.putg([Ann.RX_PARITY_OK + rxtx, ['Parity bit', 'Parity', 'P']])else:# TODO: Return expected/actual parity values.self.putp(['PARITY ERROR', rxtx, (0, 1)]) # FIXME: Dummy tuple...self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']])self.frame_valid[rxtx] = Falseself.advance_state(rxtx, signal)
  • 用于处理 UART 协议中的 奇偶校验位,验证其正确性并根据结果更新帧状态。以下是分步解析:
  1. 记录校验位值:保存校验位的电平值。
  2. 校验验证:根据配置的校验类型(奇校验、偶校验等),验证校验位是否合法。
  3. 状态更新与输出
    • 若校验通过:输出校验位信息至日志和可视化界面。
    • 若校验失败:标记帧为无效,输出错误信息。
  4. 状态转移:推进状态机至下一阶段(如停止位处理)

保存校验位值并更新位计数器

self.paritybit[rxtx] = signal
self.cur_frame_bit[rxtx] += 1
  • paritybit[rxtx]:存储校验位的实际电平值(01)。
  • cur_frame_bit[rxtx]:递增帧内位计数器,表示已处理完校验位。

校验位验证

if parity_ok(self.options['parity'], self.paritybit[rxtx],self.datavalue[rxtx], self.options['data_bits']):# 校验成功self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])self.putg([Ann.RX_PARITY_OK + rxtx, ['Parity bit', 'Parity', 'P']])
else:# 校验失败self.putp(['PARITY ERROR', rxtx, (0, 1)])  # 虚拟元组,需完善self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']])self.frame_valid[rxtx] = False
  • parity_ok() 函数
    根据校验类型、校验位值、数据值(datavalue)和数据位数,返回校验结果。
    示例逻辑

    def parity_ok(parity_type, parity_bit, data, data_bits):if parity_type == 'none':return True  # 无校验,始终通过# 计算数据位中1的个数ones = bin(data).count('1') if parity_type == 'even':return (ones + parity_bit) % 2 == 0elif parity_type == 'odd':return (ones + parity_bit) % 2 == 1# 其他校验类型(如 'zero', 'one')...
    
  • 校验成功

    • putp:输出校验位信息到 Python 对象通道(如日志记录)。
    • putg:在逻辑分析仪界面标注校验位(如 “Parity bit”)。
  • 校验失败

    • 输出错误信息到 Python 和注释通道。
    • 标记帧无效(frame_valid[rxtx] = False),后续流程可能忽略此帧。
    • 注意:代码中使用了虚拟元组 (0, 1),需替换为实际期望值与实际值(如 (expected_parity, actual_parity))。

状态转移

self.advance_state(rxtx, signal)
  • 作用:推进状态机至下一状态(如处理停止位)。

  • 实现逻辑(伪代码):

    def advance_state(self, rxtx, signal):if self.state[rxtx] == 'PARITY_CHECK':self.state[rxtx] = 'PROCESS_STOP_BITS'
    

get_stop_bits

def get_stop_bits(self, rxtx, signal):self.stopbits[rxtx].append(signal)self.cur_frame_bit[rxtx] += 1# Stop bits must be 1. If not, we report an error.if signal != 1:self.putp(['INVALID STOPBIT', rxtx, signal])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = Falseself.putp(['STOPBIT', rxtx, signal])self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']])# Postprocess the UART frame after all STOP bits were seen.if len(self.stopbits[rxtx]) < self.options['stop_bits']:returnself.advance_state(rxtx, signal)
  • get_stop_bits 负责 处理停止位,验证其合法性并控制状态机流转。核心功能包括:
  1. 记录停止位值:将每个停止位的电平值存入缓存。
  2. 停止位有效性检查:验证停止位是否为高电平(1),否则标记帧无效。
  3. 状态机推进:当所有配置的停止位处理完成后,进入下一状态(如空闲状态)。

记录停止位值并更新位计数器

self.stopbits[rxtx].append(signal)
self.cur_frame_bit[rxtx] += 1
  • stopbits[rxtx]:列表存储每个停止位的电平值(如 [1, 1])。
  • cur_frame_bit:递增帧内位计数器,表示已处理完当前停止位。

验证停止位合法性

if signal != 1:self.putp(['INVALID STOPBIT', rxtx, signal])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = False
  • 停止位规则:UART 协议要求停止位为高电平(1)。
  • 错误处理
    • putp:输出错误信息到 Python 对象通道(如日志记录)。
    • putg:在逻辑分析仪界面标注错误(如 Frame error)。
    • 标记帧无效(frame_valid[rxtx] = False)。

输出停止位信息

self.putp(['STOPBIT', rxtx, signal])
self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']])
  • 日志与注释:无论停止位是否有效,均记录其存在(如 Stop bit)。

检查停止位数量

if len(self.stopbits[rxtx]) < self.options['stop_bits']:return
self.advance_state(rxtx, signal)
  • 数量判断:比较已接收的停止位数量与配置值(如 stop_bits=1.0)。
  • 潜在问题
    stop_bits 为浮点数(如 1.5),len(stopbits[rxtx]) 可能无法正确匹配(需特殊处理时间长度而非数量)。
  • 状态转移:满足数量后调用 advance_state,进入空闲状态或下一帧解析。
	def handle_frame(self, rxtx, ss, es):# Pass the complete UART frame to upper layers.self.putpse(ss, es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])def handle_idle(self, rxtx, ss, es):self.putpse(ss, es, ['IDLE', rxtx, 0])def handle_break(self, rxtx, ss, es):self.putpse(ss, es, ['BREAK', rxtx, 0])self.putgse(ss, es, [Ann.RX_BREAK + rxtx,['Break condition', 'Break', 'Brk', 'B']])self.state[rxtx] = 'WAIT FOR START BIT'

get_wait_cond

	def get_wait_cond(self, rxtx, inv):# Return condititions that are suitable for Decoder.wait(). Those# conditions either match the falling edge of the START bit, or# the sample point of the next bit time.state = self.state[rxtx]if state == 'WAIT FOR START BIT':return {rxtx: 'r' if inv else 'f'}if state in ('GET START BIT', 'GET DATA BITS','GET PARITY BIT', 'GET STOP BITS'):bitnum = self.cur_frame_bit[rxtx]# TODO: Currently does not support half STOP bits.want_num = ceil(self.get_sample_point(rxtx, bitnum))return {'skip': want_num - self.samplenum}
  • get_wait_cond方法用于动态生成解码器的等待条件,确保协议解析过程能够精确同步到UART帧的各个阶段。

根据当前解析状态(如等待起始位、处理数据位等),返回以下两种条件之一:

  1. 边沿触发条件:用于检测起始位的下降沿(或上升沿,若信号反转)。
  2. 采样点跳转条件:指示解码器需要跳过的样本数,以对齐到下一个位的采样点。

取当前状态

state = self.state[rxtx]
  • 作用
    根据方向(RX/TX)获取解析器的当前状态,如 'WAIT FOR START BIT''GET DATA BITS'

处理「等待起始位」状态

if state == 'WAIT FOR START BIT':return {rxtx: 'r' if inv else 'f'}
  • 逻辑
    • 起始位是UART帧的开始,协议要求起始位为低电平(逻辑 0)。
    • inv 参数的作用
      • inv = False(默认):等待下降沿'f'),即从空闲高电平跳变到起始位低电平。
      • inv = True(信号反转):等待上升沿'r'),即反转后的低电平变为高电平。
  • 返回值
    字典 {RX: 'f'}{TX: 'r'},表示需要监测对应通道的边沿事件。

处理「数据/校验/停止位」状态

if state in ('GET START BIT', 'GET DATA BITS', 'GET PARITY BIT', 'GET STOP BITS'):bitnum = self.cur_frame_bit[rxtx]want_num = ceil(self.get_sample_point(rxtx, bitnum))return {'skip': want_num - self.samplenum}
  • 逻辑
    1. 获取当前位号
      bitnum 表示当前正在处理的帧内位索引(如起始位为 0,数据位从 1 开始)。
    2. 计算目标样本点
      调用 get_sample_point(rxtx, bitnum) 计算当前位的采样点(如位的中间位置)。
      • 示例:若位宽为 10 样本,采样点配置为 50%,则采样点为 起始样本点 + 5
      • ceil 的作用:将浮点采样点向上取整,确保跳转到整数样本位置。
    3. 计算需跳过的样本数
      want_num - self.samplenum 表示从当前样本点 (self.samplenum) 到目标样本点 (want_num) 需要跳过的样本数。
  • 返回值
    字典 {'skip': N},指示解码器跳过 N 个样本后继续解析。

get_idle_cond

def get_idle_cond(self, rxtx, inv):# Return a condition that corresponds to the (expected) end of# the next frame, assuming that it will be an "idle frame"# (constant high input level for the frame's length).if self.idle_start[rxtx] is None:return Noneend_of_frame = self.idle_start[rxtx] + self.frame_len_sample_countif end_of_frame < self.samplenum:return Nonereturn {'skip': end_of_frame - self.samplenum}
  • 在 UART 协议解析中用于检测空闲状态(帧间持续高电平),其作用是根据当前空闲开始时间计算需要跳过的样本数以确认空闲帧结束。

空闲状态检测条件

  • 检查空闲开始时间:若 self.idle_start[rxtx]None,表示未开始检测空闲,返回 None
  • 计算预期结束点end_of_frame = 空闲开始时间 + 一帧总样本数
  • 超时判断:若当前样本 samplenum 已超过 end_of_frame,返回 None(空闲已结束)。
  • 返回跳过样本数:若未超时,返回需跳过的样本数 `{‘skip’:

inspect_sample

def inspect_sample(self, rxtx, signal, inv):# Inspect a sample returned by .wait() for the specified UART line.if inv:signal = not signalstate = self.state[rxtx]if state == 'WAIT FOR START BIT':self.wait_for_start_bit(rxtx, signal)elif state == 'GET START BIT':self.get_start_bit(rxtx, signal)elif state == 'GET DATA BITS':self.get_data_bits(rxtx, signal)elif state == 'GET PARITY BIT':self.get_parity_bit(rxtx, signal)elif state == 'GET STOP BITS':self.get_stop_bits(rxtx, signal)
  • 是 UART 协议解析器的核心驱动方法,根据当前解析状态(如等待起始位、处理数据位等),处理单个样本点并调用相应的子方法推进解析流程。

信号反转处理

if inv:signal = not signal
  • 作用:根据 inv 配置调整电平逻辑,适配硬件差异。
    • inv = Truesignal = 1 变为 0signal = 0 变为 1
    • 应用场景:某些硬件(如 RS-232)使用负逻辑(高电平为 0,低电平为 1),需反转信号。

状态路由与解析

state = self.state[rxtx]
if state == 'WAIT FOR START BIT':self.wait_for_start_bit(rxtx, signal)
elif state == 'GET START BIT':self.get_start_bit(rxtx, signal)
elif state == 'GET DATA BITS':self.get_data_bits(rxtx, signal)
elif state == 'GET PARITY BIT':self.get_parity_bit(rxtx, signal)
elif state == 'GET STOP BITS':self.get_stop_bits(rxtx, signal)
  • 状态机驱动:根据当前状态调用对应的处理方法,各状态含义如下:
状态调用的方法作用
WAIT FOR START BITwait_for_start_bit检测起始位下降沿,初始化帧解析状态。
GET START BITget_start_bit验证起始位有效性,重置数据缓存,准备接收数据位。
GET DATA BITSget_data_bits逐位接收数据,组合为整数值,格式化输出。
GET PARITY BITget_parity_bit验证校验位合法性,标记帧有效性。
GET STOP BITSget_stop_bits检查停止位是否为高电平,完成帧解析后推进至空闲状态。

执行流程示例

场景:接收一帧 8N1 数据(无校验,1停止位)

  1. 初始状态state = 'WAIT FOR START BIT'
    • 检测到下降沿(反转后为低电平),调用 wait_for_start_bit
      • 记录起始位样本点,状态变为 'GET START BIT'
  2. 处理起始位state = 'GET START BIT'
    • 调用 get_start_bit
      • 验证起始位为低电平,状态变为 'GET DATA BITS'
  3. 接收数据位state = 'GET DATA BITS'
    • 循环调用 get_data_bits 8次:
      • 每次读取一位数据,存储并更新数据值。
      • 完成8位后,状态变为 'GET STOP BITS'
  4. 处理停止位state = 'GET STOP BITS'
    • 调用 get_stop_bits
      • 验证停止位为高电平,状态回到 'WAIT FOR START BIT'

inspect_edge

def inspect_edge(self, rxtx, signal, inv):# Inspect edges, independently from traffic, to detect break conditions.if inv:signal = not signalif not signal:# Signal went low. Start another interval.self.break_start[rxtx] = self.samplenumreturn# Signal went high. Was there an extended period with low signal?if self.break_start[rxtx] is None:returndiff = self.samplenum - self.break_start[rxtx]if diff >= self.break_min_sample_count:ss, es = self.frame_start[rxtx], self.samplenumself.handle_break(rxtx, ss, es)self.break_start[rxtx] = Nonedef inspect_idle(self, rxtx, signal, inv):# Check each edge and each period of stable input (either level).# Can derive the "idle frame period has passed" condition.if inv:signal = not signalif not signal:# Low input, cease inspection.self.idle_start[rxtx] = Nonereturn# High input, either just reached, or still stable.if self.idle_start[rxtx] is None:self.idle_start[rxtx] = self.samplenumdiff = self.samplenum - self.idle_start[rxtx]if diff < self.frame_len_sample_count:returnss, es = self.idle_start[rxtx], self.samplenumself.handle_idle(rxtx, ss, es)self.idle_start[rxtx] = es
  • 通过检测信号边沿来识别 BREAK 条件(持续低电平超过一帧时间)
  1. 信号反转

    if inv:signal = not signal
    
    • 适配硬件电平逻辑(如 RS-232 使用负逻辑)。
  2. 检测下降沿(低电平)

    if not signal:self.break_start[rxtx] = self.samplenumreturn
    
    • 当信号变为低电平时,记录起始样本点 break_start[rxtx]
  3. 检测上升沿(高电平)

    if self.break_start[rxtx] is None:return
    diff = self.samplenum - self.break_start[rxtx]
    if diff >= self.break_min_sample_count:ss, es = self.frame_start[rxtx], self.samplenumself.handle_break(rxtx, ss, es)
    self.break_start[rxtx] = None
    
    • 当信号恢复高电平时,计算低电平持续时间 diff

    • diff 超过阈值 break_min_sample_count(通常为一帧时间的样本数),调用 handle_break 处理 BREAK 条件。

    • 潜在问题ss 错误地使用 frame_start[rxtx](应为 break_start[rxtx]),需修正为:

      ss, es = self.break_start[rxtx], self.samplenum
      
  • 检测 空闲状态(持续高电平超过一帧时间),用于验证帧间间隔的合规性。
  1. 信号反转

    if inv:signal = not signal
    
  2. 低电平处理

    if not signal:self.idle_start[rxtx] = Nonereturn
    
    • 检测到低电平时,重置空闲起始点 idle_start[rxtx]
  3. 高电平处理

    if self.idle_start[rxtx] is None:self.idle_start[rxtx] = self.samplenum
    diff = self.samplenum - self.idle_start[rxtx]
    if diff < self.frame_len_sample_count:return
    ss, es = self.idle_start[rxtx], self.samplenum
    self.handle_idle(rxtx, ss, es)
    self.idle_start[rxtx] = es
    
    • 持续高电平时,计算持续时间 diff
    • 若超过阈值 frame_len_sample_count(通常为一帧时间),调用 handle_idle 处理空闲状态。
    • 潜在问题:更新 idle_start[rxtx] = es 可能导致后续空闲段被分割。建议改为 idle_start[rxtx] = None 以重新检测连续空闲。

decode

 def decode(self):if not self.samplerate:raise SamplerateError('Cannot decode without samplerate.')has_pin = [self.has_channel(ch) for ch in (RX, TX)]if not True in has_pin:raise ChannelError('Need at least one of TX or RX pins.')opt = self.optionsinv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']cond_data_idx = [None] * len(has_pin)# Determine the number of samples for a complete frame's time span.# A period of low signal (at least) that long is a break condition.frame_samples = 1 # STARTframe_samples += self.options['data_bits']frame_samples += 0 if self.options['parity'] == 'none' else 1frame_samples += self.options['stop_bits']frame_samples *= self.bit_widthself.frame_len_sample_count = ceil(frame_samples)self.break_min_sample_count = self.frame_len_sample_countcond_edge_idx = [None] * len(has_pin)cond_idle_idx = [None] * len(has_pin)while True:conds = []if has_pin[RX]:cond_data_idx[RX] = len(conds)conds.append(self.get_wait_cond(RX, inv[RX]))cond_edge_idx[RX] = len(conds)conds.append({RX: 'e'})cond_idle_idx[RX] = Noneidle_cond = self.get_idle_cond(RX, inv[RX])if idle_cond:cond_idle_idx[RX] = len(conds)conds.append(idle_cond)if has_pin[TX]:cond_data_idx[TX] = len(conds)conds.append(self.get_wait_cond(TX, inv[TX]))cond_edge_idx[TX] = len(conds)conds.append({TX: 'e'})cond_idle_idx[TX] = Noneidle_cond = self.get_idle_cond(TX, inv[TX])if idle_cond:cond_idle_idx[TX] = len(conds)conds.append(idle_cond)(rx, tx) = self.wait(conds)if cond_data_idx[RX] is not None and self.matched[cond_data_idx[RX]]:self.inspect_sample(RX, rx, inv[RX])if cond_edge_idx[RX] is not None and self.matched[cond_edge_idx[RX]]:self.inspect_edge(RX, rx, inv[RX])self.inspect_idle(RX, rx, inv[RX])if cond_idle_idx[RX] is not None and self.matched[cond_idle_idx[RX]]:self.inspect_idle(RX, rx, inv[RX])if cond_data_idx[TX] is not None and self.matched[cond_data_idx[TX]]:self.inspect_sample(TX, tx, inv[TX])if cond_edge_idx[TX] is not None and self.matched[cond_edge_idx[TX]]:self.inspect_edge(TX, tx, inv[TX])self.inspect_idle(TX, tx, inv[TX])if cond_idle_idx[TX] is not None and self.matched[cond_idle_idx[TX]]:self.inspect_idle(TX, tx, inv[TX])
  • UART 协议解析器的入口方法,负责协调整个解析流程。其核心任务包括:
  1. 初始化校验:确保采样率和通道配置有效。
  2. 计算帧参数:确定帧长度和 BREAK 条件的最小样本数。
  3. 事件循环:监听 RX/TX 通道的各类事件(数据、边沿、空闲),并调用对应的处理方法。

初始校验

if not self.samplerate:raise SamplerateError('Cannot decode without samplerate.')has_pin = [self.has_channel(ch) for ch in (RX, TX)]
if not True in has_pin:raise ChannelError('Need at least one of TX or RX pins.')
  • 校验采样率:必须配置采样率以计算时间相关参数。
  • 校验通道:至少启用 RX 或 TX 通道。

信号反转配置

opt = self.options
inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']
  • inv 列表:标记 RX 和 TX 是否需要信号反转。例如,inv[RX] = True 表示 RX 通道的电平逻辑需取反。

帧参数计算

frame_samples = 1  # START
frame_samples += self.options['data_bits']
frame_samples += 0 if self.options['parity'] == 'none' else 1
frame_samples += self.options['stop_bits']
frame_samples *= self.bit_width
self.frame_len_sample_count = ceil(frame_samples)
self.break_min_sample_count = self.frame_len_sample_count
  • 帧长度计算

    • 起始位:固定 1 位。
    • 数据位data_bits 位。
    • 校验位:根据配置(parity)决定是否添加。
    • 停止位:直接累加 stop_bits(可能为浮点数,如 1.5)。
    • 总样本数:总位数 × 位宽(bit_width)。
  • 问题:浮点停止位直接累加到整数 frame_samples 会导致精度丢失。
    改进建议

    frame_samples = (1 +  # STARTself.options['data_bits'] +(0 if self.options['parity'] == 'none' else 1) +self.options['stop_bits']
    ) * self.bit_width
    self.frame_len_sample_count = ceil(frame_samples)
    

主事件循环

while True:conds = []# 为 RX 和 TX 通道构建等待条件if has_pin[RX]:# 数据条件(如等待起始位或采样点)cond_data_idx[RX] = len(conds)conds.append(self.get_wait_cond(RX, inv[RX]))# 边沿条件(检测任何边沿)cond_edge_idx[RX] = len(conds)conds.append({RX: 'e'})# 空闲条件(持续高电平)cond_idle_idx[RX] = Noneidle_cond = self.get_idle_cond(RX, inv[RX])if idle_cond:cond_idle_idx[RX] = len(conds)conds.append(idle_cond)# 同理处理 TX 通道...# 等待任一条件触发(rx, tx) = self.wait(conds)# 处理 RX 通道触发的事件if cond_data_idx[RX] is not None and self.matched[cond_data_idx[RX]]:self.inspect_sample(RX, rx, inv[RX])if cond_edge_idx[RX] is not None and self.matched[cond_edge_idx[RX]]:self.inspect_edge(RX, rx, inv[RX])self.inspect_idle(RX, rx, inv[RX])if cond_idle_idx[RX] is not None and self.matched[cond_idle_idx[RX]]:self.inspect_idle(RX, rx, inv[RX])# 同理处理 TX 通道触发的事件...

步骤分解

  1. 构建等待条件列表 conds
    • 数据条件:由 get_wait_cond 返回,如等待起始位下降沿或数据采样点。
    • 边沿条件:监听通道的任何边沿事件('e')。
    • 空闲条件:由 get_idle_cond 返回,如持续高电平超时。
  2. 等待事件触发
    • self.wait(conds) 阻塞直到任一条件满足,返回当前 RX/TX 信号值。
  3. 处理触发事件
    • 数据条件:调用 inspect_sample 解析数据位。
    • 边沿条件:调用 inspect_edge 检测 BREAK 条件,并检查空闲状态。
    • 空闲条件:直接调用 inspect_idle
方法作用
get_wait_cond(rxtx, inv)返回当前状态下的等待条件(如边沿或采样点跳转)。
get_idle_cond(rxtx, inv)返回检测空闲状态的条件(如持续高电平超过阈值)。
inspect_sample解析数据位、校验位或停止位。
inspect_edge检测 BREAK 条件(长低电平)。
inspect_idle处理空闲状态(持续高电平)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/76489.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

技术分享|iTOP-RK3588开发板Ubuntu20系统旋转屏幕方案

iTOP-3588开发板采用瑞芯微RK3588处理器&#xff0c;是全新一代AloT高端应用芯片&#xff0c;采用8nmLP制程&#xff0c;搭载八核64位CPU&#xff0c;四核Cortex-A76和四核Cortex-A55架构&#xff0c;主频高达2.4GHz。是一款可用于互联网设备和其它数字多媒体的高性能产品。 在…

Unity IL2CPP内存泄漏追踪方案(基于Memory Profiler)技术详解

一、IL2CPP内存管理特性与泄漏根源 1. IL2CPP内存架构特点 内存区域管理方式常见泄漏类型托管堆(Managed)GC自动回收静态引用/事件订阅未取消原生堆(Native)手动管理非托管资源未释放桥接层GCHandle/PInvoke跨语言引用未正确释放 对惹&#xff0c;这里有一个游戏开发交流小组…

消融实验_草稿

五列数据 \begin{table}[htbp]\caption{Performance Comparison of Standalone KD Variants vs MIRKD-enhanced Variants on ACNE04 Dataset\label{AblationKD}}\centering\renewcommand{\arraystretch}{1.2}\scriptsize\begin{tabularx}{\linewidth}{{}l *{3}{>{\centering…

面向对象高级(1)

文章目录 final认识final关键字修饰类&#xff1a;修饰方法&#xff1a;修饰变量final修饰变量的注意事项 常量 单例类什么是设计模式&#xff1f;单例怎么写?饿汉式单例的特点是什么&#xff1f;单例有啥应用场景&#xff0c;有啥好处&#xff1f;懒汉式单例类。 枚举类认识枚…

不用额外下载jar包,idea快速查看使用的组件源码

以nacos为例子&#xff0c;在idea中引入了nacos依赖&#xff0c;就可以查看源码了。 2. idea选择open&#xff08;不关闭项目直接选择file-open也可以&#xff09;, 在maven的仓库里找到对应的包&#xff0c;打开 2.idea中选择 jar包&#xff0c;选择 add as library 3.这样j…

小白学习java第12天:IO流之缓冲流

1.IO缓冲流&#xff1a; 之前我们学习的都是原始流&#xff08;FileInputStream字节输入流、FileOutputStream字节输出流、FIleReader字符输入流、FIleWriter字符输出流&#xff09;其实我们可以知道对于这些其实性能都不是很好&#xff0c;要么太慢一个一个&#xff0c;要么就…

高速电路设计概述

1.1 低速设计和高速设计的例子 本节通过一个简单的例子&#xff0c;探讨高速电路设计相对于低速电路设计需要考虑哪些不同的问题。希望读者通过本例&#xff0c;对高速电路设计建立一个表象的认识。至于高速电路设计中各方面的设计要点&#xff0c;将在后续章节展开详细的讨论…

MySQL8.0.31安装教程,附pdf资料和压缩包文件

参考资料&#xff1a;黑马程序员 一、下载 点开下面的链接&#xff1a;https://dev.mysql.com/downloads/mysql/ 点击Download 就可以下载对应的安装包了, 安装包如下: 我用夸克网盘分享了「mysql」&#xff0c;链接&#xff1a;https://pan.quark.cn/s/ab7b7acd572b 二、解…

在Java项目中,引入【全局异常处理器】

目录 一.为什么引入全局异常处理器&#xff08;目前项目碰到了什么问题&#xff09;&#xff1f; 1.问题描述 2.与预期的差别 3.解决方案 二.解决上述问题 1.定义【业务异常类】 2.在serviceImpl层&#xff0c;手动抛出【违反唯一性约束】这个异常 3.定义【全局异常处理…

newspaper公共库获取每个 URL 对应的新闻内容,并将提取的新闻正文保存到一个文件中

示例代码&#xff1a; from newspaper import Article from newspaper import Config import json from tqdm import tqdm import os import requestswith open(datasource/api/news_api.json, r) as file:data json.load(file)print(len(data)) save_path datasource/sourc…

前端核心知识:Vue 3 编程的 10 个实用技巧

文章目录 1. **使用 ref 和 reactive 管理响应式数据**原理解析代码示例注意事项 2. **组合式 API&#xff08;Composition API&#xff09;**原理解析代码示例优势 3. **使用 watch 和 watchEffect 监听数据变化**原理解析代码示例注意事项 4. **使用 provide 和 inject 实现跨…

【Web API系列】XMLHttpRequest API和Fetch API深入理解与应用指南

前言 在现代Web开发中&#xff0c;客户端与服务器之间的异步通信是构建动态应用的核心能力。无论是传统的AJAX技术&#xff08;基于XMLHttpRequest&#xff09;还是现代的Fetch API&#xff0c;它们都为实现这一目标提供了关键支持。本文将从底层原理、核心功能、代码实践到实…

[特殊字符] Spring Boot 日志系统入门博客大纲(适合初学者)

一、前言 &#x1f4cc; 为什么日志在项目中如此重要&#xff1f; 在开发和维护一个后端系统时&#xff0c;日志就像程序运行时的“黑匣子”&#xff0c;帮我们记录系统的各种行为和异常。一份良好的日志&#xff0c;不仅能帮助我们快速定位问题&#xff0c;还能在以下场景中…

IP协议之IP,ICMP协议

1.因特网中的主要协议是TCP/IP&#xff0c;Interneet协议也叫TCP/IP协议簇 2.ip地址用点分十进制表示&#xff0c;由32位的二进制表示&#xff0c;两部分组成&#xff1a;网络标识主机标识 3.IP地址分类; A:0.0.0.0-127.255.255.255 B&#xff1a;128.0.0.0-191.255.255.25…

GPIO_ReadInputData和GPIO_ReadInputDataBit区别

目录 1、GPIO_ReadInputData: 2、GPIO_ReadInputDataBit: 总结 GPIO_ReadInputData 和 GPIO_ReadInputDataBit 是两个函数&#xff0c;通常用于读取微控制器GPIO&#xff08;通用输入输出&#xff09;引脚的输入状态&#xff0c;特别是在STM32系列微控制器中。它们之间的主要…

洛古B4158 [BCSP-X 2024 12 月小学高年级组] 质数补全(线性筛/dfs)

B4158 [BCSP-X 2024 12 月小学高年级组] 质数补全 - 洛谷 思路1:线性筛,字符串匹配,枚举 质数筛选 要解决这个问题&#xff0c;首先得找出指定范围内&#xff08;这里是 1 到 10000000&#xff09;的所有质数。常用的质数筛选算法有埃拉托斯特尼筛法&#xff08;埃氏筛&#…

一周学会Pandas2 Python数据处理与分析-Pandas2读取Excel

锋哥原创的Pandas2 Python数据处理与分析 视频教程&#xff1a; 2025版 Pandas2 Python数据处理与分析 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili Excel格式文件是办公使用和处理最多的文件格式之一&#xff0c;相比CSV文件&#xff0c;Excel是有样式的。Pandas2提…

NVIDIA H100 vs A100:新一代GPU架构性能对比分析

一、核心架构演进对比 ‌Ampere架构&#xff08;A100&#xff09;‌采用台积电7nm工艺&#xff0c;集成540亿晶体管&#xff0c;配备6,912个CUDA核心和432个第三代Tensor Core&#xff0c;支持FP16、TF32和INT8精度计算。其显存子系统采用HBM2e技术&#xff0c;80GB版本带宽可…

保护PCBA的不同方法:喷三防漆 vs 镀膜

PCBA&#xff08;印刷电路板组件&#xff09;的防护工艺中&#xff0c;喷三防漆和镀膜&#xff08;如Parylene气相沉积&#xff09;是两种常见技 术。它们在防护目的上类似&#xff0c;但在具体实现方式和应用场景上有显著差异。以下从外观、工艺、性 能、物理性质和成本五个…

VitePress 项目部署 cloudflare page 提示 npm run build 错误

构建的错误信息如下&#xff1a; 09:52:57.975 ➤ YN0000: Done with warnings in 3s 120ms 09:52:58.072 Executing user command: npm run build 09:52:58.817 npm ERR! Missing script: "build" 09:52:58.818 npm ERR! 09:52:58.818 npm ERR! To see a list of …