目录
XOA是什么
XOA CLI
XOA Python API
XOA Python Test Suite/测试套件
XOA Converter
Source Code
XOA是什么
XOA(Xena Open-Source Automation)是一个开源的测试自动化框架,追求“高效、易用、灵活”的跨操作系统的开发框架。能与Xena现有解决方案无缝配合,借助XOA可调用Xena(Z系列打流仪、E系列损伤仪)完成自动化测试任务。同时它提供了操作接口可将其他仪表/设备做并栈集成测试验证,统一整理输出测试报告。
XOA包含:
XOA CLI、XOA Python API、XOA Python TestSuite、XOA Converter
XOA CLI
XOA CLI 提供了一套简洁直观的基于文本语言的独立命令,用以控制和集成 Xena测试仪硬件,实现各种测试任务的自动化。
任何客户端平台/编程语言(如 Python、Tcl、Bash)都可与 XOA CLI 配合使用。
CLI 可在远程登录终端上使用,直接向 Xena 测试仪发送命令。
ValkyrieManager通过测试端口配置文件(.xpc ),可在 XOA CLI 环境之间无缝转换。
XOA Python API
XOA Python API与XOA CLI和XenaManager无缝集成
- 面向对象的高级抽象: XOA Python API采用面向对象的方法,提供了更高层次的抽象,加快了自动化脚本的开发。
- 集成开发环境自动完成,内置手册: XOA Python API 包含 IDE 自动完成以及类、函数和 API 内置手册等功能,可显著提高开发效率。
- 命令分组和响应自动匹配:该功能允许命令分组和响应自动匹配,从而优化了测试执行效率。
- 服务器到客户端推送通知订阅: XOA Python API 支持服务器到客户端的推送通知订阅,降低了用户代码的复杂性。
- 支持 Python 3.8 及更高版本: XOA Python API 兼容 Python 3.8 及更高版本,确保与现代 Python 环境兼容。
import asyncio from contextlib import suppress from xoa_driver import testers from xoa_driver import modules from xoa_driver import ports from xoa_driver import utils from xoa_driver import enums from xoa_driver import exceptions from ipaddress import IPv4Address, IPv6Address from binascii import hexlify from xoa_driver.misc import Hex#--------------------------- # Global parameters #---------------------------CHASSIS_IP = "10.165.16.70" # Chassis IP address or hostname USERNAME = "XOA" # Username MODULE_INDEX = 4 # Module index TX_PORT_INDEX = 0 # TX Port indexFRAME_SIZE_BYTES = 4178 # Frame size on wire including the FCS. FRAME_COUNT = 20 # The number of frames including the first, the middle, and the last. REPETITION = 1 # The number of repetitions of the frame sequence, set to 0 if you want the port to repeat over and over TRAFFIC_RATE_FPS = 100 # Traffic rate in frames per second TRAFFIC_RATE_PERCENT = int(4/10 * 1000000)SHOULD_BURST = False # Whether the middle frames should be bursty BURST_SIZE_FRAMES = 9 # Burst size in frames for the middle frames INTER_BURST_GAP_BYTES = 3000 # The inter-burst gap in bytes INTRA_BURST_GAP_BYTES = 1000 # The inter-frame gap within a burst, aka. intra-burst gap, in bytes#--------------------------- # Header content for streams #--------------------------- class Ethernet:def __init__(self):self.dst_mac = "0000.0000.0000"self.src_mac = "0000.0000.0000"self.ethertype = "86DD"def __str__(self):_dst_mac = self.dst_mac.replace(".", "")_src_mac = self.src_mac.replace(".", "")_ethertype = self.ethertypereturn f"{_dst_mac}{_src_mac}{_ethertype}".upper()class IPV4:def __init__(self):self.version = 4self.header_length = 5self.dscp = 0self.ecn = 0self.total_length = 42self.identification = "0000"self.flags = 0self.offset = 0self.ttl = 255self.proto = 255self.checksum = "0000"self.src = "0.0.0.0"self.dst = "0.0.0.0"def __str__(self):_ver = '{:01X}'.format(self.version)_header_length = '{:01X}'.format(self.header_length)_dscp_ecn = '{:02X}'.format((self.dscp<<2)+self.ecn)_total_len = '{:04X}'.format(self.total_length)_ident = self.identification_flag_offset = '{:04X}'.format((self.flags<<13)+self.offset)_ttl = '{:02X}'.format(self.ttl)_proto = '{:02X}'.format(self.proto)_check = self.checksum_src = hexlify(IPv4Address(self.src).packed).decode()_dst = hexlify(IPv4Address(self.dst).packed).decode()return f"{_ver}{_header_length}{_dscp_ecn}{_total_len}{_ident}{_flag_offset}{_ttl}{_proto}{_check}{_src}{_dst}".upper()class IPV6:def __init__(self):self.version = 6self.traff_class = 8self.flow_label = 0self.payload_length = 0self.next_header = "11"self.hop_limit = 1self.src = "2000::2"self.dst = "2000::100"def __str__(self):_ver = '{:01X}'.format(self.version)_traff_class = '{:01X}'.format(self.traff_class)_flow_label = '{:06X}'.format(self.flow_label)_payload_len = '{:04X}'.format(self.payload_length)_next_header = self.next_header_hop_limit = '{:02X}'.format(self.hop_limit)_src = hexlify(IPv6Address(self.src).packed).decode()_dst = hexlify(IPv6Address(self.dst).packed).decode()return f"{_ver}{_traff_class}{_flow_label}{_payload_len}{_next_header}{_hop_limit}{_src}{_dst}".upper()class UDP:def __init__(self):self.src_port = 0self.dst_port = 0self.length = 0self.checksum = 0def __str__(self):_src_port = '{:04X}'.format(self.src_port)_dst_port = '{:04X}'.format(self.dst_port)_length = '{:04X}'.format(self.length)_checksum = '{:04X}'.format(self.checksum)return f"{_src_port}{_dst_port}{_length}{_checksum}".upper()class ROCEV2:def __init__(self):self.opcode = 0self.solicited_event = 0self.mig_req = 0self.pad_count = 1self.header_version = 0self.partition_key = 65535self.reserved = 7self.dest_queue_pair = 2self.ack_request = 0self.reserved_7bits = 0self.packet_seq_number =0def __str__(self):_opcode = '{:02X}'.format(self.opcode)_combo_1 = '{:02X}'.format((self.solicited_event<<7)+(self.mig_req<<6)+(self.pad_count<<4)+self.header_version)_pk = '{:04X}'.format(self.partition_key)_reserved = '{:02X}'.format(self.reserved)_qp = '{:06X}'.format(self.dest_queue_pair)_combo_2 = '{:02X}'.format((self.ack_request<<7)+self.reserved_7bits)_ps = '{:06X}'.format(self.packet_seq_number)return f"{_opcode}{_combo_1}{_pk}{_reserved}{_qp}{_combo_2}{_ps}".upper()#------------------------------ # def my_awesome_func() #------------------------------ async def my_awesome_func(stop_event: asyncio.Event, should_burst: bool) -> None:"""This Python function uses XOA Python API to configure the TX port:param stop_event::type stop_event: asyncio.Event:param should_burst: Whether the middle frames should be bursty.:type should_burst: bool"""# create tester instance and establish connectiontester = await testers.L23Tester(CHASSIS_IP, USERNAME, enable_logging=False) # access the module on the testermodule = tester.modules.obtain(MODULE_INDEX)# check if the module is of type Loki-100G-5S-2Pif not isinstance(module, modules.ModuleChimera):# access the txport on the moduletxport = module.ports.obtain(TX_PORT_INDEX)#---------------------------# Port reservation#---------------------------print(f"#---------------------------")print(f"# Port reservation")print(f"#---------------------------")if txport.is_released():print(f"The txport is released (not owned by anyone). Will reserve the txport to continue txport configuration.")await txport.reservation.set_reserve() # set reservation , means txport will be controlled by our sessionelif not txport.is_reserved_by_me():print(f"The txport is reserved by others. Will relinquish and reserve the txport to continue txport configuration.")await txport.reservation.set_relinquish() # send relinquish the txportawait txport.reservation.set_reserve() # set reservation , means txport will be controlled by our session#---------------------------# Start port configuration#---------------------------print(f"#---------------------------")print(f"# Start port configuration")print(f"#---------------------------")print(f"Reset the txport")await txport.reset.set()print(f"Configure the txport")await utils.apply(# txport.speed.mode.selection.set(mode=enums.PortSpeedMode.F100G),txport.comment.set(comment="RoCE2 on Loki"),txport.tx_config.enable.set_on(),txport.latency_config.offset.set(offset=0),txport.latency_config.mode.set(mode=enums.LatencyMode.LAST2LAST),txport.tx_config.burst_period.set(burst_period=0),txport.tx_config.packet_limit.set(packet_count_limit=FRAME_COUNT*REPETITION),txport.max_header_length.set(max_header_length=128),txport.autotrain.set(interval=0),txport.loop_back.set_none(), # If you want loopback the port TX to its own RX, change it to set_txoff2rx()txport.checksum.set(offset=0),txport.tx_config.delay.set(delay_val=0),txport.tpld_mode.set_normal(),txport.payload_mode.set_normal(),#txport.rate.pps.set(port_rate_pps=TRAFFIC_RATE_FPS), # If you want to control traffic rate with FPS, uncomment this.txport.rate.fraction.set(TRAFFIC_RATE_PERCENT), # If you want to control traffic rate with fraction, uncomment this. 1,000,000 = 100%)if should_burst:await txport.tx_config.mode.set_burst()else:await txport.tx_config.mode.set_sequential()#--------------------------------------# Configure stream_0 on the txport#--------------------------------------print(f" Configure first-packet stream on the txport")stream_0 = await txport.streams.create()eth = Ethernet()eth.src_mac = "aaaa.aaaa.0005"eth.dst_mac = "bbbb.bbbb.0005"ipv4 = IPV4()ipv4.src = "1.1.1.5"ipv4.dst = "2.2.2.5"ipv6 = IPV6()ipv6.src = "2001::5"ipv6.dst = "2002::5"udp = UDP()udp.src_port = 4791udp.dst_port = 4791rocev2 = ROCEV2()rocev2.opcode = 0rocev2.dest_queue_pair = 2rocev2.packet_seq_number = 0await utils.apply(stream_0.enable.set_on(),stream_0.packet.limit.set(packet_count=1),stream_0.comment.set(f"First packet"),stream_0.rate.fraction.set(stream_rate_ppm=10000),stream_0.packet.header.protocol.set(segments=[enums.ProtocolOption.ETHERNET,enums.ProtocolOption.IPV6,enums.ProtocolOption.UDP,enums.ProtocolOption.RAW_12,]),stream_0.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),stream_0.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),stream_0.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD")),stream_0.tpld_id.set(test_payload_identifier = 0),stream_0.insert_packets_checksum.set_on())if should_burst:await stream_0.burst.burstiness.set(size=1, density=100)await stream_0.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0)#--------------------------------------# Configure stream_1 on the txport#--------------------------------------print(f" Configure middle-packets stream on the txport")stream_1 = await txport.streams.create()rocev2.opcode = 1rocev2.dest_queue_pair = 2rocev2.packet_seq_number = 1await utils.apply(stream_1.enable.set_on(),stream_1.packet.limit.set(packet_count=FRAME_COUNT-2),stream_1.comment.set(f"Middle packets"),stream_1.rate.fraction.set(stream_rate_ppm=10000),stream_1.packet.header.protocol.set(segments=[enums.ProtocolOption.ETHERNET,enums.ProtocolOption.IPV6,enums.ProtocolOption.UDP,enums.ProtocolOption.RAW_12,]),stream_1.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),stream_1.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),stream_1.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD")),stream_1.tpld_id.set(test_payload_identifier = 1),stream_1.insert_packets_checksum.set_on())if should_burst:await stream_1.burst.burstiness.set(size=BURST_SIZE_FRAMES, density=100)await stream_1.burst.gap.set(inter_packet_gap=INTRA_BURST_GAP_BYTES, inter_burst_gap=INTER_BURST_GAP_BYTES)# Configure a modifier on the stream_1await stream_1.packet.header.modifiers.configure(1)# Modifier on the SQNmodifier = stream_1.packet.header.modifiers.obtain(0)await modifier.specification.set(position=72, mask="FFFF0000", action=enums.ModifierAction.INC, repetition=1)await modifier.range.set(min_val=1, step=1, max_val=FRAME_COUNT-2)#--------------------------------------# Configure stream_2 on the txport#--------------------------------------print(f" Configure last-packet stream on the txport")stream_2 = await txport.streams.create()rocev2.opcode = 2rocev2.dest_queue_pair = 2rocev2.packet_seq_number = FRAME_COUNT-1await utils.apply(stream_2.enable.set_on(),stream_2.packet.limit.set(packet_count=1),stream_2.comment.set(f"Last packet"),stream_2.rate.fraction.set(stream_rate_ppm=10000),stream_2.packet.header.protocol.set(segments=[enums.ProtocolOption.ETHERNET,enums.ProtocolOption.IPV6,enums.ProtocolOption.UDP,enums.ProtocolOption.RAW_12,]),stream_2.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),stream_2.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),stream_2.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD")),stream_2.tpld_id.set(test_payload_identifier = 2),stream_2.insert_packets_checksum.set_on())if should_burst:await stream_2.burst.burstiness.set(size=1, density=100)await stream_2.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0)async def main():stop_event =asyncio.Event()try:await my_awesome_func(stop_event, should_burst=SHOULD_BURST)except KeyboardInterrupt:stop_event.set()if __name__=="__main__":asyncio.run(main())
XOA Python Test Suite/测试套件
XOA Python 测试套件是一个测试框架,为开发人员和测试专家执行和集成 Xena 测试套件提供了定义明确的 API。
该框架以自动化方式处理各种任务,如测试资源管理、测试执行和发布测试结果。
每个 RFC 测试套件都被设计成独立的 "插件",可根据需要有选择性地集成到项目中。
目前,XOA Python 测试套件包括
- RFC2544
- RFC2889
- RFC3918
XOA Converter
如果您希望将当前的 Xena 测试套件配置快速迁移到 XOA,现在使用 XOA 转换器工具比以往任何时候都更容易。
以前,Xena的测试套件应用程序仅与Windows兼容。但今后,所有现有和未来的测试套件都将并入 XOA Python 测试套件,从而消除 Windows 限制。
为了简化过渡,我们推出了 XOA 转换器。该工具允许用户将现有的Xena测试套件配置(Xena2544、Xena2889和Xena3918)从Xena窗口桌面应用程序无缝迁移到XOA Python测试套件中。有了 XOA 转换器,迁移过程变得轻松简单。
Source Code
GitHub 是我们托管 XOA 源代码的首选平台,因为它具有出色的版本控制和协作能力。它为管理代码变更提供了一个极佳的环境,确保项目的历史记录完备且易于访问。我们崇尚开放,鼓励每个人使用、分享、贡献和反馈我们的源代码。GitHub 允许进行无缝协作,并促进以社区为导向的方法,让每个人都能积极参与 XOA 的开发和改进。我们重视来自社区的意见和贡献,因为这能提高源代码的整体质量和创新性。
- XOA Python API Source Code
- XOA Python Test Suite – Core Source Code
- XOA Python Test Suite – Plugin Source Code
- XOA ANLT Utility Source Code
- XOA Converter Source Code