内容太多了,这篇记录UDP接收端
一,UDP接收端接收数据
有了pollnet这个开源项目的支持,接收端的步骤为:1)初始化硬编码的参数:接口,IP和端口
2)创建接收文件.csv
3)读取UDP包并将其写入文件中
std::atomic<bool> active = {true};
int main(int argc, char *argv[]) {//硬编码的参数:接口、IP和端口const char* interface = "enp8s0f1"; // 网口const char* local_ip = "10.100.100.161"; // 本地IP地址int local_port = 12346; // 本地端口
// const char* sub_ip = "10.100.100.162"; // 可选的订阅IPEfviUdpReceiver receiver;if (!receiver.init(interface, local_ip, local_port)) {cout << receiver.getLastError() << endl;return 1;}std::string fname1 = "接收文件.csv";//bool ret1 = isFileExists_ifstream(fname1);if (ret1){//文件存在cout<<"File is exit\n"<<endl;} else {std::ofstream outFile1(fname1);if (outFile1){std::cout <<"Files created successful.\n" << std::endl;}else {std::cout <<"Failed to create files.\n" << std::endl;return 1;}//关闭文件outFile1.close();}ofstream outFile1(fname1, ios::out);outFile1 << "DataTimeStamp"<<endl;while (active.load(std::memory_order_acquire)) {receiver.read(outFile1, outFile2, outFile3, outFile4);}return 0;
}
声明一个原子布尔类型的变量 active
,并初始化为 true
。这是为了控制接收器的运行状态。使用 std::atomic
类型确保在多线程环境下对 active
的操作是安全的。
std::atomic<bool> active = {true};
c++的标准流函数,用于检查指定路径的文件是否存在
bool isFileExists_ifstream(string& name) {ifstream f(name.c_str());return f.good();
}
ofstream outFile1(fname1, ios::out);
:使用ofstream
打开fname1
文件,并准备写入数据。ios::out
表示以输出模式打开文件。outFile1 << "DataTimeStamp" << endl;
:写入一个标题DataTimeStamp
到文件,之后换行。这为后续的接收数据做准备。
ofstream outFile1(fname1, ios::out);
outFile1 << "DataTimeStamp" << endl;
输出模式(ios::out
)的含义:
ios::out
表示文件被以写入模式打开,也就是用于向文件写入数据。- 如果文件已经存在,
ios::out
会清空文件内容(覆盖文件)。 - 如果文件不存在,
ios::out
会创建一个新文件。 ios::in
:表示以输入模式打开文件,用于从文件读取数据。ios::app
:表示以追加模式打开文件,写入的数据将被添加到文件末尾,而不是覆盖原文件内容。ios::ate
:表示打开文件后,文件指针将移动到文件的末尾,但仍然可以进行读写操作。ios::trunc
:表示打开文件时,如果文件已经存在,则清空文件内容(这个模式是ios::out
的默认行为)。ios::binary
:表示以二进制模式打开文件,而不是文本模式。
二,pollnet项目
这个项目是git的一个开源项目:https://github.com/MengRao/pollnet/tree/master(MIT许可证)
包括一些TCP,UDP的发送接收的封装类,Efvi与Socket的发送方式demo等等
我主要学习了Efvi.h这个库文件,其中包含EfviUdpSender和EfviReceiver两个大类,其中EfviReceiver包括EfviUdpReceiver,EfviEthReceiver,可以看到这个库函数主要是封装了UDP通信的发送接收类。
1,EfviUdpSender
这个类中主要有公有函数init,write,close,私有函数getMacFromARP,getGW,init_udp_pkt,ci_ip4_hdr_init,saveError,hexchartoi等等
1)init
函数 init
的目的是初始化网络接口、配置地址和端口,设置虚拟接口并准备内存以便发送数据。它的主要功能是准备好一个高效的网络通信环境,具体来说是通过 EFVi 驱动(一个与高速网络适配器或类似硬件接口的驱动相关的工具)进行 UDP 数据包的发送。
struct sockaddr_in local_addr;
struct sockaddr_in dest_addr;
uint8_t local_mac[6];
uint8_t dest_mac[6];
local_addr.sin_port = htons(local_port);
inet_pton(AF_INET, local_ip, &(local_addr.sin_addr));
dest_addr.sin_port = htons(dest_port);
inet_pton(AF_INET, dest_ip, &(dest_addr.sin_addr));
local_addr
和dest_addr
是用于本地和目标地址的结构体(sockaddr_in
);- 使用
inet_pton
将 IP 地址(字符串)转换为二进制格式,存储到sin_addr
中; - 端口号使用
htons
函数转换为网络字节序。
if ((0xff & dest_addr.sin_addr.s_addr) < 224) {char dest_mac_addr[64];if (!getMacFromARP(interface, dest_ip, dest_mac_addr)) {char gw[64];if (!getGW(dest_ip, gw) || !getMacFromARP(interface, gw, dest_mac_addr)) {saveError("Can't find dest ip from arp cache, please ping dest ip first", 0);return false;}}if (strlen(dest_mac_addr) != 17) {saveError("invalid dest_mac_addr", 0);return false;}for (int i = 0; i < 6; ++i) {dest_mac[i] = hexchartoi(dest_mac_addr[3 * i]) * 16 + hexchartoi(dest_mac_addr[3 * i + 1]);}
}
- 检查目标地址是否为单播地址(
dest_ip
的最后一个字节是否小于 224,如果小于则是单播地址)。 - 如果是单播地址,调用
getMacFromARP
函数获取目标 IP 的 MAC 地址。如果 ARP 表中没有目标 IP 的 MAC 地址,尝试从网关获取。 hexchartoi
是一个将十六进制字符转换为整数的辅助函数。
if ((0xff & dest_addr.sin_addr.s_addr) < 224) {char dest_mac_addr[64];if (!getMacFromARP(interface, dest_ip, dest_mac_addr)) {char gw[64];if (!getGW(dest_ip, gw) || !getMacFromARP(interface, gw, dest_mac_addr)) {saveError("Can't find dest ip from arp cache, please ping dest ip first", 0);return false;}}if (strlen(dest_mac_addr) != 17) {saveError("invalid dest_mac_addr", 0);return false;}for (int i = 0; i < 6; ++i) {dest_mac[i] = hexchartoi(dest_mac_addr[3 * i]) * 16 + hexchartoi(dest_mac_addr[3 * i + 1]);}
}
- 如果目标地址是组播地址(即目标地址的第一字节大于等于 224),则根据组播地址生成目标 MAC 地址。
- 组播地址的 MAC 地址计算规则是:以
01:00:5e
开头,后面三字节根据 IP 地址最后三个字节生成。
else {dest_mac[0] = 0x1;dest_mac[1] = 0;dest_mac[2] = 0x5e;dest_mac[3] = 0x7f & (dest_addr.sin_addr.s_addr >> 8);dest_mac[4] = 0xff & (dest_addr.sin_addr.s_addr >> 16);dest_mac[5] = 0xff & (dest_addr.sin_addr.s_addr >> 24);
}
- 打开驱动程序并初始化虚拟接口句柄(
dh
)。ef_driver_open
是一个操作系统或驱动接口的函数,用来初始化虚拟接口。 ef_pd_alloc_by_name
用于分配一个数据包描述符(Packet Descriptor,简称 PD),该描述符用于管理网络数据包。
int rc;
if ((rc = ef_driver_open(&dh)) < 0) {saveError("ef_driver_open failed", rc);return false;
}
if ((rc = ef_pd_alloc_by_name(&pd, dh, interface, EF_PD_DEFAULT)) < 0) {saveError("ef_pd_alloc_by_name failed", rc);return false;
}
- 配置虚拟接口的标志和能力,检查虚拟接口是否支持 CTPIO(Contiguous Transmission Protocol Input/Output)。如果支持 CTPIO,则在虚拟接口标志中启用相应的标志。
int vi_flags = EF_VI_FLAGS_DEFAULT;
int ifindex = if_nametoindex(interface);
unsigned long capability_val = 0;
if (ef_vi_capabilities_get(dh, ifindex, EF_VI_CAP_CTPIO, &capability_val) == 0 && capability_val) {use_ctpio = true;vi_flags |= EF_VI_TX_CTPIO;
}
- 使用
ef_vi_alloc_from_pd
从数据包描述符分配虚拟接口(VI)。该接口用于管理虚拟网络设备并发送/接收数据包。 ef_vi_get_mac
获取分配的虚拟接口的 MAC 地址。
size_t alloc_size = N_BUF * PKT_BUF_SIZE;
buf_mmapped = true;
pkt_bufs = (uint8_t*)mmap(NULL, alloc_size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_HUGETLB, -1, 0);
if (pkt_bufs == MAP_FAILED) {buf_mmapped = false;rc = posix_memalign((void**)&pkt_bufs, 4096, alloc_size);if (rc != 0) {saveError("posix_memalign failed", -rc);return false;}
}
- 为网络数据包分配内存缓冲区。首先尝试使用
mmap
将内存映射到进程地址空间,如果失败则使用posix_memalign
来对齐内存。
if ((rc = ef_memreg_alloc(&memreg, dh, &pd, dh, pkt_bufs, alloc_size)) < 0) {saveError("ef_memreg_alloc failed", rc);return false;
}
- 使用
ef_memreg_alloc
为缓冲区注册内存,确保其可以通过网络适配器访问
for (int i = 0; i < N_BUF; i++) {struct pkt_buf* pkt = (struct pkt_buf*)(pkt_bufs + i * PKT_BUF_SIZE);pkt->post_addr = ef_memreg_dma_addr(&memreg, i * PKT_BUF_SIZE) + sizeof(ef_addr);init_udp_pkt(&(pkt->eth), local_addr, local_mac, dest_addr, dest_mac);
}
- 为每个数据包初始化缓冲区,并填充 UDP 包的相关信息,如源地址、目标地址和 MAC 地址。
uint16_t* ip4 = (uint16_t*)&((struct pkt_buf*)pkt_bufs)->ip4;
ipsum_cache = 0;
for (int i = 0; i < 10; i++) {ipsum_cache += ip4[i];
}
ipsum_cache = (ipsum_cache >> 16u) + (ipsum_cache & 0xffff);
ipsum_cache += (ipsum_cache >> 16u);
- 计算数据包的校验和(
ipsum_cache
),确保数据在传输过程中的完整性。
2)write
bool write(const void* data, uint32_t size) {// 为缓冲区分配内存
// uint8_t* pkt_bufs = new uint8_t[N_BUF * PKT_BUF_SIZE];struct pkt_buf* pkt = (struct pkt_buf*)(pkt_bufs + buf_index_ * PKT_BUF_SIZE);struct ci_ether_hdr* eth = &pkt->eth;struct ci_ip4_hdr* ip4 = (struct ci_ip4_hdr*)(eth + 1);struct ci_udp_hdr* udp = (struct ci_udp_hdr*)(ip4 + 1);uint16_t iplen = htons(28 + size);// 假设IP头部为28字节ip4->ip_tot_len_be16 = iplen;udp->udp_len_be16 = htons(8 + size);// 将 data 数据复制到数据包中
// memcpy(pkt + 1, data, size);// 将数据复制到数据包中memcpy(reinterpret_cast<void*>(pkt + 1), data, size); // 正确的内存拷贝uint32_t frame_len = 42 + size;int rc;if (use_ctpio) {uint32_t ipsum = ipsum_cache + iplen;ipsum += (ipsum >> 16u);ip4->ip_check_be16 = ~ipsum & 0xffff;// 使用CTPIO方式发送数据ef_vi_transmit_ctpio(&vi, &pkt->eth, frame_len, frame_len);rc = ef_vi_transmit_ctpio_fallback(&vi, pkt->post_addr, frame_len, buf_index_);}else {rc = ef_vi_transmit(&vi, pkt->post_addr, frame_len, buf_index_);}// 更新缓冲区索引buf_index_ = (buf_index_ + 1) % N_BUF;// 处理传输事件ef_event evs[EF_VI_EVENT_POLL_MIN_EVS];ef_request_id ids[EF_VI_TRANSMIT_BATCH];int events = ef_eventq_poll(&vi, evs, EF_VI_EVENT_POLL_MIN_EVS);for (int i = 0; i < events; ++i) {if (EF_EVENT_TYPE_TX == EF_EVENT_TYPE(evs[i])) {ef_vi_transmit_unbundle(&vi, &evs[i], ids);}}// 释放内存
// delete[] pkt_bufs;return true;}
- 通过
pkt_bufs
和buf_index_
来访问一个缓冲区中的数据包。pkt_bufs
是之前分配的一个内存区域,buf_index_
是当前正在使用的缓冲区索引。 pkt
是一个指向当前缓冲区的指针,类型是pkt_buf
,它包含以太网头、IP头和UDP头。
- 通过指针
eth
、ip4
和udp
来访问数据包中的以太网头、IP头和UDP头。 iplen
计算IP头部和数据部分的总长度。这里假设IP头部是28字节,数据部分的大小是size
。ip_tot_len_be16
和udp_len_be16
分别是IP和UDP头中的长度字段,它们需要以网络字节序(大端序)来表示。- 将传入的
data
数据复制到数据包中的适当位置(在以太网头、IP头和UDP头之后的位置)。这里pkt + 1
是指向数据部分的指针。 memcpy
将原始数据复制到数据包中的正确位置。frame_len
计算整个数据包的长度,42字节是以太网头、IP头和UDP头的总长度。-
这里分为两种发送模式:
- CTPIO模式:使用 CTPIO 方式发送数据。这是一种直接传输模式,通过
ef_vi_transmit_ctpio
发送数据。如果 CTPIO 发送失败,则会使用回退方式ef_vi_transmit_ctpio_fallback
。 - 普通发送模式:如果不使用 CTPIO,则直接通过
ef_vi_transmit
发送数据包。
- CTPIO模式:使用 CTPIO 方式发送数据。这是一种直接传输模式,通过
-
ipsum_cache
是一个缓存的校验和,用来计算IP头部的校验和。校验和计算方法是将所有IP头的16位部分加起来,并进行取反操作,确保数据的完整性。 -
在发送完当前数据包后,更新
buf_index_
,使得缓冲区索引在0
到N_BUF-1
之间循环。 - 使用
ef_eventq_poll
检查是否有传输事件。这通常是在传输过程中可能发生的事件(如传输成功、失败等)。 - 如果是发送事件(
EF_EVENT_TYPE_TX
),则通过ef_vi_transmit_unbundle
处理事件,并返回请求ID。
2,EfviUdpReceiver
包含init函数与read函数
1)Init
bool init(const char* interface, const char* dest_ip, uint16_t dest_port, const char* subscribe_ip = "") {if (!EfviReceiver::init(interface)) {return false;}udp_prefix_len = 64 + ef_vi_receive_prefix_len(&vi) + 14 + 20 + 8;int rc;ef_filter_spec filter_spec;struct sockaddr_in sa_local;sa_local.sin_port = htons(dest_port);inet_pton(AF_INET, dest_ip, &(sa_local.sin_addr));ef_filter_spec_init(&filter_spec, EF_FILTER_FLAG_NONE);if ((rc = ef_filter_spec_set_ip4_local(&filter_spec, IPPROTO_UDP, sa_local.sin_addr.s_addr, sa_local.sin_port)) <0) {std::cerr << "ef_filter_spec_set_ip4_local failed" << rc << std::endl;return false;}if ((rc = ef_vi_filter_add(&vi, dh, &filter_spec, NULL)) < 0) {std::cerr << "ef_vi_filter_add failed" << rc << std::endl;return false;}if (subscribe_ip[0]) {if ((subscribe_fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {std::cerr << "socket failed" << -errno << std::endl;return false;}struct ip_mreq group;inet_pton(AF_INET, subscribe_ip, &(group.imr_interface));inet_pton(AF_INET, dest_ip, &(group.imr_multiaddr));if (setsockopt(subscribe_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&group, sizeof(group)) < 0) {std::cerr << "setsockopt IP_ADD_MEMBERSHIP failed" << -errno << std::endl;return false;}}return true;}
初始化网络通信,配置和设置了接收UDP数据包的过滤器并加入多播组。
- interface:指定要用于接收数据包的网络接口。
- dest_ip:目标 IP 地址,数据包的目的地址。
- dest_port:目标端口,接收数据的 UDP 端口号。
- subscribe_ip:可选的多播地址。如果设置了此参数,函数将加入到指定的多播组。
udp_prefix_len = 64 + ef_vi_receive_prefix_len(&vi) + 14 + 20 + 8;
udp_prefix_len
是 UDP 数据包的前缀长度,包含:
-
64
:可能是与硬件或特定网络库相关的长度。 -
ef_vi_receive_prefix_len(&vi)
:获取网络接口接收前缀长度。 -
14
:以太网头部长度(Ethernet header)。 -
20
:IPv4头部长度(IP header)。 -
8
:UDP头部长度。
ef_filter_spec filter_spec;
struct sockaddr_in sa_local;
sa_local.sin_port = htons(dest_port);
inet_pton(AF_INET, dest_ip, &(sa_local.sin_addr));
ef_filter_spec_init(&filter_spec, EF_FILTER_FLAG_NONE);
-
创建一个
filter_spec
过滤器规格对象,和一个sockaddr_in
地址结构体sa_local
,它用来设置目标 IP 和端口。 -
inet_pton
将目标 IP 地址转换为网络字节序的地址格式。 -
htons
将目标端口号转换为网络字节序。
if ((rc = ef_filter_spec_set_ip4_local(&filter_spec, IPPROTO_UDP, sa_local.sin_addr.s_addr, sa_local.sin_port)) < 0) {std::cerr << "ef_filter_spec_set_ip4_local failed" << rc << std::endl;return false;
}
- 设置过滤器,指定目标协议为
UDP
,目标 IP 和端口为dest_ip
和dest_port
。 - 如果配置过滤器失败,输出错误并返回
false
。
if ((rc = ef_vi_filter_add(&vi, dh, &filter_spec, NULL)) < 0) {std::cerr << "ef_vi_filter_add failed" << rc << std::endl;return false;
}
- 将刚刚配置好的过滤器添加到接收网络接口中。
vi
是接收接口对象,dh
是设备句柄,filter_spec
是过滤器规格。 - 如果添加失败,输出错误并返回
false
。
if (subscribe_ip[0]) {if ((subscribe_fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {std::cerr << "socket failed" << -errno << std::endl;return false;}struct ip_mreq group;inet_pton(AF_INET, subscribe_ip, &(group.imr_interface));inet_pton(AF_INET, dest_ip, &(group.imr_multiaddr));if (setsockopt(subscribe_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&group, sizeof(group)) < 0) {std::cerr << "setsockopt IP_ADD_MEMBERSHIP failed" << -errno << std::endl;return false;}
}
- 如果传入了
subscribe_ip
参数,表示需要加入一个多播组。 - 创建一个
socket
用于接收多播数据。 - 使用
inet_pton
转换多播地址和本地网络接口的 IP 地址。 - 使用
setsockopt
函数将本地接口加入到指定的多播组。 - 如果多播加入失败,输出错误并返回
false
。
2) read
这个和原来的有一些修改:
ef_eventq_poll
:从事件队列中获取接收到的数据包。pkt_buf
:指向接收到的网络数据包的缓冲区。printPacket
:打印数据包的内容。
void read(ofstream &outFile1, ofstream &outFile2, ofstream &outFile3, ofstream &outFile4) {ef_event evs;if (ef_eventq_poll(&vi, &evs, 1) == 0) return;int id = EF_EVENT_RX_RQ_ID(evs);pkt_buf = (struct pkt_buf*)(pkt_bufs + id * PKT_BUF_SIZE);if (EF_EVENT_TYPE(evs) == EF_EVENT_TYPE_RX) {const char* buf = (const char*)pkt_buf + 64;const int bufLen = EF_EVENT_RX_BYTES(evs);if(bufLen != 0){// memcpy(udp_pkt_data, buf, bufLen);//TODO: 此处是否可以直接从DMA缓冲区里读取?TORALEV2API::CTORATstpLev2MarketDataField temp_MarketDataField = {0};//快照TORALEV2API::CTORATstpLev2OrderDetailField temp_OrderDetailField = {0};//逐笔委托TORALEV2API::CTORATstpLev2TransactionField temp_TransactionField = {0};//逐笔成交TORALEV2API::CTORATstpLev2XTSmergeField temp_SmergeField = {0};//合并主笔PARASE_CTORATstpLev2MarketDataField(buf+42, temp_MarketDataField, outFile1);
// PARASE_CTORATstpLev2OrderDetailField(buf+42, temp_OrderDetailField, outFile2);
// //行情快照 662
// if (buf[42]==0x30&&buf[43]==0x00&&buf[44]==0x00&&buf[45]==0x05) {
// PARASE_CTORATstpLev2MarketDataField(buf+42, temp_MarketDataField, outFile1);
// }
// //逐笔委托 110bytes
// if (buf[42]==0x03&&buf[43]==0x00&&buf[44]==0x00&&buf[45]==0x08){
// PARASE_CTORATstpLev2OrderDetailField(buf+42, temp_OrderDetailField, outFile2);
// }
// //逐笔成交 118bytes
// if (buf[42]==0x03&&buf[43]==0x00&&buf[44]==0x00&&buf[45]==0x07){
// PARASE_CTORATstpLev2TransactionField(buf+42, temp_TransactionField, outFile3);
// }
// //合并主笔 118bytes
// if (buf[42]==0x30&&buf[43]==0x00&&buf[44]==0x00&&buf[45]==0x13){ //固定填值:0x13000030
// PARASE_CTORATstpLev2XTSmergeField(buf+42, temp_SmergeField, outFile4);
// }printPacket(buf, bufLen);}}ef_vi_receive_post(&vi, pkt_buf->post_addr, id);return;}private:int udp_prefix_len;struct pkt_buf* pkt_buf;int subscribe_fd_ = -1;};
if (ef_eventq_poll(&vi, &evs, 1) == 0) return;
这一行从网络事件队列中获取事件,vi
是 ef_vi
设备实例,evs
存储事件信息。ef_eventq_poll
的返回值为 0 时表示没有事件可处理。
我曾在接收端修改了缓冲区,在write函数中手动new了pkt_bufs,直接release了pkt_bufs,导致在接收部分ef_eventq_poll
的返回值为 0 时表示没有事件可处理。
pkt_buf = (struct pkt_buf*)(pkt_bufs + id * PKT_BUF_SIZE);
根据事件的 id
,从 pkt_bufs
数组中获取对应的接收缓冲区。
const char* buf = (const char*)pkt_buf + 64;
const int bufLen = EF_EVENT_RX_BYTES(evs);
if (bufLen != 0) {TORALEV2API::CTORATstpLev2MarketDataField temp_MarketDataField = {0};TORALEV2API::CTORATstpLev2OrderDetailField temp_OrderDetailField = {0};TORALEV2API::CTORATstpLev2TransactionField temp_TransactionField = {0};TORALEV2API::CTORATstpLev2XTSmergeField temp_SmergeField = {0};PARASE_CTORATstpLev2MarketDataField(buf+42, temp_MarketDataField, outFile1);
}
-
buf
指向接收到的数据包,从第 64 字节开始(可能是去除掉了 Ethernet 和 IP 头部),然后通过EF_EVENT_RX_BYTES
获取数据包长度。之后,解析不同的字段数据并将其写入对应的输出流(例如outFile1
)。
printPacket(buf, bufLen);
调用 printPacket
函数,打印数据包的内容,通常用于调试。
ef_vi_receive_post(&vi, pkt_buf->post_addr, id);
调用 ef_vi_receive_post
完成数据包的处理,并将接收缓冲区重新发布。