dpdk原理概述及核心源码剖析

dpdk原理

1、操作系统、计算机网络诞生已经几十年了,部分功能不再能满足现在的业务需求。如果对操作系统做更改,成本非常高,所以部分问题是在应用层想办法解决的,比如前面介绍的协程、quic等,都是在应用层重新开发的框架,简单回顾如下:

  • 协程:server多线程通信时,如果每连接一个客户端就要生成一个线程去处理,对server硬件资源消耗极大!为了解决多线程以及互相切换带来的性能损耗,应用层发明了协程框架:单线程人为控制跳转到不同的代码块执行,避免了cpu浪费、线程锁/切换等一系列耗时的问题!

  • quic协议:tcp协议已经深度嵌入了操作系统,更改起来难度很大,所以同样也是在应用层基于udp协议实现了tls、拥塞控制等,彻底让协议和操作系统松耦合!

除了上述问题,操作还有另一个比较严重的问题:基于os内核的网络数据IO!传统做网络开发时,接收和发送数据用的是操作系统提供的receive和send函数,用户配置一下网络参数、传入应用层的数据即可!操作系统由于集成了协议栈,会在用户传输的应用层数据前面加上协议不同层级的包头,然后通过网卡发送数据;接收到的数据处理方式类似,按照协议类型一层一层拨开,直到获取到应用层的数据!整个流程大致如下:

网卡接受数据----->发出硬件中断通知cpu来取数据----->os把数据复制到内存并启动内核线程          --->软件中断--->内核线程在协议栈中处理包--->处理完毕通知用户层

大家有没有觉得这个链条忒长啊?这么长的处理流程带来的问题:

  • “中间商”多,整个流程耗时;数据进入下一个环节时容易cache miss

  • 同一份数据在内存不同的地方存储(缓存内存、内核内存、用户空间的内存),浪费内存

  • 网卡通过中断通知cpu,每次硬中断大约消耗100微秒,这还不算因为终止上下文所带来的Cache Miss(L1、L2、TLB等cpu的cache可能都会更新)

  • 用户到内核态的上下文切换耗时

  • 数据在内核态用户态之间切换拷贝带来大量CPU消耗,全局锁竞争

  • 内核工作在多核上,为保障全局一致,即使采用Lock Free,也避免不了锁总线、内存屏障带来的性能损耗

这一系列的问题都是内核处理网卡接收到的数据导致的。大胆一点想象:如果不让内核处理网卡数据了?能不能避免上述各个环节的损耗了?能不能让3环的应用直接控制网卡收发数据了?

2、如果真的通过3环应用层直接读写网卡,面临的问题:

  • 用户空间的内存要映射到网卡,才能直接读写网卡

  • 驱动要运行在用户空间

(1)这两个问题是怎么解决的了?这一切都得益于linux提供的UIO机制! UIO 能够拦截中断,并重设中断回调行为(相当于hook了,这个功能还是要在内核实现的,因为硬件中断只能在内核处理),从而绕过内核协议栈后续的处理流程。这里借用别人的一张图:

UIO 设备的实现机制其实是对用户空间暴露文件接口,比如当注册一个 UIO 设备 uioX,就会出现文件 /dev/uioX(用于读取中断,底层还是要在内核处理,因为硬件中断只能发生在内核),对该文件的读写就是对设备内存的读写(通过mmap实现)。除此之外,对设备的控制还可以通过 /sys/class/uio 下的各个文件的读写来完成。所以UIO的本质:

  •  让用户空间的程序拦截内核的中断,更改中断的handler处理函数,让用户空间的程序第一时间拿到刚从网卡接收到的“一手、热乎”数据,减少内核的数据处理流程!由于应用程序拿到的是网络链路层(也就是第二层)的数据,这就需要应用程序自己按照协议解析数据了!说个额外的:这个功能可以用来抓包!

简化后的示意图如下:原本网卡是由操作系统内核接管的,现在直接由3环的dpdk应用控制了!

这就是dpdk的第一个优点;除了这个,还有以下几个:

(2)Huge Page 大页:传统页面大小是4Kb,如果进程要使用64G内存,则64G/4KB=16000000(一千六百万)页,所有在页表项中占用16000000 * 4B=62MB;但是TLB缓存的空间是有限的,不可能存储这么多页面的地址映射关系,所以可能导致TLB miss;如果改成2MB的huge Page,所需页面减少到64G/2MB=2000个。在TLB容量有限的情况下尽可能地多在TLB存放地址映射,极大减少了TLB miss!下图是采用不同大小页面时TLB能覆盖的内存对比!

(3)mempool 内存池:任何网络协议都要处理报文,这些报文肯定是存放在内存的!申请和释放内存就需要调用malloc和free函数了!这两个是系统调用,涉及到上下文切换;同时还要用buddy或slab算法查找空闲内存块,效率较低!dpdk 在用户空间实现了一套精巧的内存池技术,内核空间和用户空间的内存交互不进行拷贝,只做控制权转移。当收发数据包时,就减少了内存拷贝的开销!

(4)Ring 无锁环:多线程/多进程之间互斥,传统的方式就是上锁!但是dpdk基于 Linux 内核的无锁环形缓冲 kfifo 实现了自己的一套无锁机制,支持多消费者或单消费者出队、多生产者或单生产者入队;

(5)PMD poll-mode网卡驱动:网络IO监听有两种方式,分别是

  • 事件驱动,比如epoll:这种方式进程让出cpu后等数据;一旦有了数据,网卡通过中断通知操作系统,然后唤醒进程继续执行!这种方式适合于接收的数据量不大,但实时性要求高的场景;

  • 轮询,比如poll:本质就是用死循环不停的检查内存有没有数据到来!这种方式适合于接收大块数据,实时性要求不高的场景;

总的来说说:中断是外界强加给的信号,必须被动应对,而轮询则是应用程序主动地处理事情。前者最大的影响就是打断系统当前工作的连续性,而后者则不会,事务的安排自在掌握!

dpdk采用第二种轮询方式:直接用死循环不停的地检查网卡内存,带来了零拷贝、无系统调用的好处,同时避免了网卡硬件中断带来的上下文切换(理论上会消耗300个时钟周期)、cache miss、硬中断执行等损耗!

(6)NUMA:dpdk 内存分配上通过 proc 提供的内存信息,使 CPU 核心尽量使用靠近其所在节点的内存,避免了跨 NUMA 节点远程访问内存的性能问题;其软件架构去中心化,尽量避免全局共享,带来全局竞争,失去横向扩展的能力

(7)CPU 亲和性: dpdk 利用 CPU 的亲和性将一个线程或多个线程绑定到一个或多个 CPU 上,这样在线程执行过程中,就不会被随意调度,一方面减少了线程间的频繁切换带来的开销,另一方面避免了 CPU L1、L2、TLB等缓存的局部失效性,增加了 CPU cache的命中率。

dpdk学习视频推荐

dpdk专题讲解,dpdk/网络协议栈/vpp/OvS/DDos/SDN/NFV/虚拟化/高性能网络专家之路icon-default.png?t=N7T8https://www.bilibili.com/video/BV1fY411z7Y8/

Linux C/C++开发(后端/音视频/游戏/嵌入式/高性能网络/存储/基础架构/安全)

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

dpdk核心源码

dpdk是intel主导开发的网络编程框架, 有这么多的优点,都是怎么实现的了?

1、UIO原理:dpdk绕过了操作系统内核,直接接管网卡,用户程序可以直接在3环读写网卡的数据,这就涉及到两个关键技术点了:

  • 地址映射:3环的程序是怎么定位到网卡数据存放在哪的了?

  • 拦截硬件中断:传统数据处理流程是网卡收到数据后通过硬件中断通知cpu来取数据,3环的程序肯定要拦截这个中断,然后通过轮询方式取数据,这个又是怎么实现的了?

(1)地址映射:3环程序最常使用的就是内存地址了,一共32或64bit;C/C++层面可以通过指针直接读写地址的值;除了内存,还有很多设备也需要和cpu交互数据,比如显示器:要在屏幕显示的内容肯定是需要用户指定的,用户程序可以把显示的内容发送到显示器指定的地方,然后再屏幕打印出来。为了方便用户程序发送数据,硬件层面会把显示器的部分存储空间映射到内存地址,做到了和内存条硬件的寻址方式一样,用户也可以直接通过指针往这里写数据(汇编层面直接通过mov指令操作即可)!网卡也类似:网卡是插在pci插槽的,网卡(或者说pci插槽)的存储空间也会映射到内存地址,应用程序读写这块物理地址就等同于读写网卡的存储空间!实际写代码时,由于要深入驱动,pci网卡预留物理的内存与io空间会保存到uio设备上,相当于将这些物理空间与io空间暴露给uio设备,应用程序访问这些uio设备即可!几个关键的函数如下:

将pci网卡的物理内存空间以及io空间保存在uio设备结构struct uio_info中的mem成员以及port成员中,uio设备就知道了网卡的物理以及io空间。应用层访问这个uio设备的物理空间以及io空间,就相当于访问pci设备的物理以及io空间;本质上就是将pci网卡的空间暴露给uio设备。

int igbuio_pci_probe(struct pci_dev *dev, const struct pci_device_id *id)
{//将pci内存,端口映射给uio设备struct rte_uio_pci_dev *udev;err = igbuio_setup_bars(dev, &udev->info);
}
static int igbuio_setup_bars(struct pci_dev *dev, struct uio_info *info)
{//pci内存,端口映射给uio设备for (i = 0; i != sizeof(bar_names) / sizeof(bar_names[0]); i++) {if (pci_resource_len(dev, i) != 0 && pci_resource_start(dev, i) != 0) {flags = pci_resource_flags(dev, i);if (flags & IORESOURCE_MEM) {//暴露pci的内存空间给uio设备ret = igbuio_pci_setup_iomem(dev, info, iom,  i, bar_names[i]);} else if (flags & IORESOURCE_IO) {//暴露pci的io空间给uio设备ret = igbuio_pci_setup_ioport(dev, info, iop,  i, bar_names[i]);}}}
}

(2)拦截硬件中断:为了减掉内核中冗余的数据处理流程,应用程序要hook网卡的中断,从源头开始拦截网卡数据!当硬件中断触发时,才不会一直触发内核去执行中断回调。也就是通过这种方式,才能在应用层实现硬件中断处理过程。注意:这里说的中断仅是控制中断,而不是报文收发的数据中断,数据中断是不会走到这里来的,因为在pmd开启中断时,没有设置收发报文的中断掩码,只注册了网卡状态改变的中断掩码;hook中断的代码如下:

int igbuio_pci_probe(struct pci_dev *dev, const struct pci_device_id *id)
{//填充uio信息udev->info.name = "igb_uio";udev->info.version = "0.1";udev->info.handler = igbuio_pci_irqhandler;        //硬件控制中断的入口,劫持原来的硬件中断udev->info.irqcontrol = igbuio_pci_irqcontrol;    //应用层开关中断时被调用,用于是否开始中断
}
static irqreturn_t igbuio_pci_irqhandler(int irq, struct uio_info *info)
{if (udev->mode == RTE_INTR_MODE_LEGACY && !pci_check_and_mask_intx(udev->pdev)){return IRQ_NONE;}//返回IRQ_HANDLED时,linux uio框架会唤醒等待uio中断的进程。注册到epoll的uio中断事件就会被调度/* Message signal mode, no share IRQ and automasked */return IRQ_HANDLED;
}
static int igbuio_pci_irqcontrol(struct uio_info *info, s32 irq_state)
{//调用内核的api来开关中断if (udev->mode == RTE_INTR_MODE_LEGACY){pci_intx(pdev, !!irq_state);}else if (udev->mode == RTE_INTR_MODE_MSIX)\{list_for_each_entry(desc, &pdev->msi_list, list)igbuio_msix_mask_irq(desc, irq_state);}
}

2、内存池:传统应用要使用内存时,一般都是调用malloc让操作系统在堆上分配。这样做有两点弊端:

  • 进入内核要切换上下文

  • 操作系统通过buddy&slab算法找合适的空闲内存

所以频繁调用malloc会严重拉低效率!如果不频繁调用malloc,怎么处理频繁收到和需要发送的报文数据了?dpdk采用的是内存池的技术:即在huge page内存中开辟一个连续的大缓冲区当做内存池!同时提供rte_mempool_get从内存池中获取内存空间。也可调用rte_mempool_put将不再使用的内存空间放回到内存池中。从这里就能看出:dpdk自己从huge page处维护了一大块内存供应用程序使用,应用程序不再需要通过系统调用从操作系统申请内存了!

(1)内存池的创建,在rte_mempool_create接口中完成。这个接口主要是在大页内存中开辟一个连续的大缓冲区当做内存池,然后将这个内存池进行分割,头部为struct rte_mempool内存池结构; 紧接着是内存池的私有结构大小,这个由应用层自己设置,每个创建内存池的应用进程都可以指定不同的私有结构; 最后是多个连续的对象元素,这些对象元素都是处于同一个内存池中。每个对象元素又有对象的头部,对象的真实数据区域,对象的尾部组成。这里所说的对象元素,其实就是应用层要开辟的真实数据空间,例如应用层自己定义的结构体变量等;本质上是dpdk自己实现了一套内存的管理办法,其作用和linux的buddy&slab是一样的,没本质区别!整个内存池图示如下:

每创建一个内存池,都会创建一个链表节点,然后插入到链表中,因此这个链表记录着当前系统创建了多少内存池。核心代码如下:

//创建内存池链表节点
te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
//内存池链表节点插入到内存池链表中
te->data = (void *) mp;
RTE_EAL_TAILQ_INSERT_TAIL(RTE_TAILQ_MEMPOOL, rte_mempool_list, te);

所以说内存池可能不止1个,会有多个!在内存池中,内存被划分成了N多的对象。应用程序要申请内存时,怎么知道哪些对象空闲可以用,哪些对象已经被占用了?当对象元素初始化完成后,会把对象指针放入ring队列,所以说ring队列的所有对象指针都是可以使用的!应用程序要申请内存时,可以调用rte_mempool_get接口从ring队列中获取,也就是出队; 使用完毕后调用rte_mempool_put将内存释放回收时,也是将要回收的内存空间对应的对象指针放到这个ring队列中,也就是入队!

(2)具体分配内存时的步骤:

  • 现代cpu基本都是多核的,多个cpu同时在内存池申请内存时无法避免涉及到互斥,会在一定程度上影响分配的效率,所以每个cpu自己都有自己的“自留地”,会优先在自己的“自留地”申请内存;

  • 如果“自留地”的内存已耗尽,才会继续去内存池申请内存!核心代码如下:

int rte_mempool_get(struct rte_mempool *mp, void **obj_table, unsigned n)
{
#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0//从当前cpu应用层缓冲区中获取cache = &mp->local_cache[lcore_id];cache_objs = cache->objs;for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++){*obj_table = cache_objs[len];}return 0;
#endif/* get remaining objects from ring *///直接从ring队列中获取ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
}

释放内存的步骤和申请类似:

  • 先查看cpu的“自留地”是否还有空间。如果有,就先把释放的对象指针放在“自留地”;

  • 如果“自留地”没空间了,再把释放的对象指针放在内存池!核心代码如下:

int rte_mempool_put(struct rte_mempool *mp, void **obj_table, unsigned n)
{
#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0//在当前cpu本地缓存有空间的场景下, 先放回到本地缓存。cache = &mp->local_cache[lcore_id];cache_objs = &cache->objs[cache->len];for (index = 0; index < n; ++index, obj_table++){cache_objs[index] = *obj_table;}//缓冲达到阈值,刷到队列中if (cache->len >= flushthresh) {rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size], cache->len - cache_size);cache->len = cache_size;}return 0
#endif//直接放回到ring队列rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
}

注意:这里的ring是环形无锁队列!

3、Poll mode driver: 不论何总形式的io,接收方获取数据的方式有两种:

  • 被动接收中断的唤醒:典型如网卡收到数据,通过硬件中断通知操作系统去处理;操作系统收到数据后会唤醒休眠的进程继续处理数据

  • 轮询 poll:写个死循环不停的检查内存地址是否有新数据到了!

在 x86 体系结构中,一次中断处理需要将 CPU 的状态寄存器保存到堆栈,并运行中断handler,最后再将保存的状态寄存器信息从堆栈中恢复,整个过程需要至少 300 个处理器时钟周期!所以dpdk果断抛弃了中断,转而使用轮询方式!整个流程大致是这样的:内核态的UIO Driver hook了网卡发出的中断信号,然后由用户态的 PMD Driver 采用主动轮询的方式。除了链路状态通知仍必须采用中断方式以外(因为网卡发出硬件中断才能触发执行hook代码的嘛,这个容易理解吧?),均使用无中断方式直接操作网卡设备的接收和发送队列。整体流程大致如下:UIO hook了网卡的中断,网卡收到数据后“被迫”执行hook代码!先是通过UIO把网卡的存储地址映射到/dev/uio文件,而后应用程序通过PMD轮询检查文件是否有新数据到来!期间也使用mmap把应用的虚拟地址映射到网卡的物理地址,减少数据的拷贝转移!

总的来说:UIO+PMD,前者旁路了内核,后者主动轮询避免了硬中断,DPDK 从而可以在用户态进行收发包的处理。带来了零拷贝(Zero Copy)、无系统调用(System call)的优化。同时,还避免了软中断的异步处理,也减少了上下文切换带来的 Cache Miss!轮询收报核心代码如下:

/*PMD轮询接收数据包*/
uint16_t
eth_em_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,uint16_t nb_pkts)
{/* volatile防止编译器优化,每次使用必须又一次从memory中取而不是用寄存器的值 */volatile struct e1000_rx_desc *rx_ring;volatile struct e1000_rx_desc *rxdp;//指向rx ring中某个e1000_rx_desc描述符struct em_rx_queue *rxq;//整个接收队列struct em_rx_entry *sw_ring;//指向描述符队列的头部,根据rx tail来偏移struct em_rx_entry *rxe;//指向sw ring中具体的entrystruct rte_mbuf *rxm;//entry里的rte mbuf/*是new mbuf,新申请的mbuf,当rxm从ring中取出后,需要用nmb再挂上去,更新对应rx ring和sw ring中的值,为下一次收包做准备*/struct rte_mbuf *nmb;struct e1000_rx_desc rxd;//具体的非指针描述符uint64_t dma_addr;uint16_t pkt_len;uint16_t rx_id;uint16_t nb_rx;uint16_t nb_hold;uint8_t status;rxq = rx_queue;nb_rx = 0;nb_hold = 0;//初始化临时变量,要开始遍历队列了rx_id = rxq->rx_tail;rx_ring = rxq->rx_ring;sw_ring = rxq->sw_ring;/* 一次性收32个报文 */while (nb_rx < nb_pkts) {/** The order of operations here is important as the DD status* bit must not be read after any other descriptor fields.* rx_ring and rxdp are pointing to volatile data so the order* of accesses cannot be reordered by the compiler. If they were* not volatile, they could be reordered which could lead to* using invalid descriptor fields when read from rxd.*//* 当前报文的descriptor */rxdp = &rx_ring[rx_id];status = rxdp->status; /* 结束标记,必须首先读取 *//*检查状态是否为dd, 不是则说明驱动还没有把报文放到接收队列,直接退出*/if (! (status & E1000_RXD_STAT_DD))break;rxd = *rxdp; /* 复制一份 *//** End of packet.** If the E1000_RXD_STAT_EOP flag is not set, the RX packet is* likely to be invalid and to be dropped by the various* validation checks performed by the network stack.** Allocate a new mbuf to replenish the RX ring descriptor.* If the allocation fails:*    - arrange for that RX descriptor to be the first one*      being parsed the next time the receive function is*      invoked [on the same queue].**    - Stop parsing the RX ring and return immediately.** This policy do not drop the packet received in the RX* descriptor for which the allocation of a new mbuf failed.* Thus, it allows that packet to be later retrieved if* mbuf have been freed in the mean time.* As a side effect, holding RX descriptors instead of* systematically giving them back to the NIC may lead to* RX ring exhaustion situations.* However, the NIC can gracefully prevent such situations* to happen by sending specific "back-pressure" flow control* frames to its peer(s).*/PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u ""status=0x%x pkt_len=%u",(unsigned) rxq->port_id, (unsigned) rxq->queue_id,(unsigned) rx_id, (unsigned) status,(unsigned) rte_le_to_cpu_16(rxd.length));nmb = rte_mbuf_raw_alloc(rxq->mb_pool);if (nmb == NULL) {PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u ""queue_id=%u",(unsigned) rxq->port_id,(unsigned) rxq->queue_id);rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed++;break;}/* 表示当前descriptor被上层软件占用 */nb_hold++;/* 当前收到的mbuf */rxe = &sw_ring[rx_id];/* 收包位置,假设超过环状数组则回滚 */rx_id++;if (rx_id == rxq->nb_rx_desc)rx_id = 0;/* mbuf加载cache下次循环使用 *//* Prefetch next mbuf while processing current one. */rte_em_prefetch(sw_ring[rx_id].mbuf);/** When next RX descriptor is on a cache-line boundary,* prefetch the next 4 RX descriptors and the next 8 pointers* to mbufs.*//* 取下一个descriptor,以及mbuf指针下次循环使用 *//* 一个cache line是4个descriptor大小(64字节) */if ((rx_id & 0x3) == 0) {rte_em_prefetch(&rx_ring[rx_id]);rte_em_prefetch(&sw_ring[rx_id]);}/* Rearm RXD: attach new mbuf and reset status to zero. */rxm = rxe->mbuf;rxe->mbuf = nmb;dma_addr =rte_cpu_to_le_64(rte_mbuf_data_iova_default(nmb));rxdp->buffer_addr = dma_addr;rxdp->status = 0;/* 重置当前descriptor的status *//** Initialize the returned mbuf.* 1) setup generic mbuf fields:*    - number of segments,*    - next segment,*    - packet length,*    - RX port identifier.* 2) integrate hardware offload data, if any:*    - RSS flag & hash,*    - IP checksum flag,*    - VLAN TCI, if any,*    - error flags.*/pkt_len = (uint16_t) (rte_le_to_cpu_16(rxd.length) -rxq->crc_len);rxm->data_off = RTE_PKTMBUF_HEADROOM;rte_packet_prefetch((char *)rxm->buf_addr + rxm->data_off);rxm->nb_segs = 1;rxm->next = NULL;rxm->pkt_len = pkt_len;rxm->data_len = pkt_len;rxm->port = rxq->port_id;rxm->ol_flags = rx_desc_status_to_pkt_flags(status);rxm->ol_flags = rxm->ol_flags |rx_desc_error_to_pkt_flags(rxd.errors);/* Only valid if PKT_RX_VLAN set in pkt_flags */rxm->vlan_tci = rte_le_to_cpu_16(rxd.special);/** Store the mbuf address into the next entry of the array* of returned packets.*//* 把收到的mbuf返回给用户 */rx_pkts[nb_rx++] = rxm;}/* 收包位置更新 */rxq->rx_tail = rx_id;/** If the number of free RX descriptors is greater than the RX free* threshold of the queue, advance the Receive Descriptor Tail (RDT)* register.* Update the RDT with the value of the last processed RX descriptor* minus 1, to guarantee that the RDT register is never equal to the* RDH register, which creates a "full" ring situtation from the* hardware point of view...*/nb_hold = (uint16_t) (nb_hold + rxq->nb_rx_hold);if (nb_hold > rxq->rx_free_thresh) {PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u ""nb_hold=%u nb_rx=%u",(unsigned) rxq->port_id, (unsigned) rxq->queue_id,(unsigned) rx_id, (unsigned) nb_hold,(unsigned) nb_rx);rx_id = (uint16_t) ((rx_id == 0) ?(rxq->nb_rx_desc - 1) : (rx_id - 1));E1000_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id);nb_hold = 0;}rxq->nb_rx_hold = nb_hold;return nb_rx;
}

接收报文的整理流程梳理如下图所示:

  • DMA控制器控制报文一个个写到rx ring中接收描述符指定的IO虚拟内存中,对应的实际内存应该就是mbuf;

  • 接收函数用rx tail变量控制不停地读取rx ring中的描述符和sw ring中的mbuf,并申请新的mbuf放入sw ring中,更新rx ring中的buffer addr

  • 最后把读取的mbuf返回给应用程序。

4、线程亲和性

一个cpu上可以运行多个线程, 由linux内核来调度各个线程的执行。内核在调度线程时,会进行上下文切换,保存线程的堆栈等信息, 以便这个线程下次再被调度执行时,继续从指定的位置开始执行。然而上下文切换是需要耗费cpu资源的的。多核体系的CPU,物理核上的线程来回切换,会导致L1/L2 cache命中率的下降。同时NUMA架构下,如果操作系统调度线程的时候,跨越了NUMA节点,将会导致大量的L3 cache的丢失。Linux对线程的亲和性是有支持的, 如果将线程和cpu进行绑定的话,线程会一直在指定的cpu上运行,不会被操作系统调度到别的cpu上,线程之间互相独立工作而不会互相扰完,节省了操作系统来回调度的时间。目前DPDK通过把线程绑定到cpu的方法来避免跨核任务中的切换开销。

线程绑定cpu物理核的函数如下:

/* set affinity for current EAL thread */
static int
eal_thread_set_affinity(void)
{unsigned lcore_id = rte_lcore_id();/* acquire system unique id  */rte_gettid();/* update EAL thread core affinity */return rte_thread_set_affinity(&lcore_config[lcore_id].cpuset);
}

继续往下走:

/*根据前面的rte_cpuset_t ,设置tid的绑定关系存储thread local socket_id存储thread local rte_cpuset_t
*/
int
rte_thread_set_affinity(rte_cpuset_t *cpusetp)
{int s;unsigned lcore_id;pthread_t tid;tid = pthread_self();//得到当前线程id//绑定cpu和线程s = pthread_setaffinity_np(tid, sizeof(rte_cpuset_t), cpusetp);if (s != 0) {RTE_LOG(ERR, EAL, "pthread_setaffinity_np failed\n");return -1;}/* store socket_id in TLS for quick access *///socketid存放到线程本地空间,便于快速读取RTE_PER_LCORE(_socket_id) =eal_cpuset_socket_id(cpusetp);/* store cpuset in TLS for quick access *///cpu信息存放到cpu本地空间,便于快速读取memmove(&RTE_PER_LCORE(_cpuset), cpusetp,sizeof(rte_cpuset_t));lcore_id = rte_lcore_id();//获取线程绑定的CPUif (lcore_id != (unsigned)LCORE_ID_ANY) {//如果不相等,就更新lcore配置/* EAL thread will update lcore_config */lcore_config[lcore_id].socket_id = RTE_PER_LCORE(_socket_id);memmove(&lcore_config[lcore_id].cpuset, cpusetp,sizeof(rte_cpuset_t));}return 0;
}

继续往下走:

int
pthread_setaffinity_np(pthread_t thread, size_t cpusetsize,const rte_cpuset_t *cpuset)
{if (override) {/* we only allow affinity with a single CPU */if (CPU_COUNT(cpuset) != 1)return POSIX_ERRNO(EINVAL);/* we only allow the current thread to sets its own affinity */struct lthread *lt = (struct lthread *)thread;if (lthread_current() != lt)return POSIX_ERRNO(EINVAL);/* determine the CPU being requested */int i;for (i = 0; i < LTHREAD_MAX_LCORES; i++) {if (!CPU_ISSET(i, cpuset))continue;break;}/* check requested core is allowed */if (i == LTHREAD_MAX_LCORES)return POSIX_ERRNO(EINVAL);/* finally we can set affinity to the requested lcore 前面做了大量的检查和容错,这里终于开始绑定cpu了*/lthread_set_affinity(i);return 0;}return _sys_pthread_funcs.f_pthread_setaffinity_np(thread, cpusetsize,cpuset);
}

绑定cpu的方法也简单:本质就是个上下文切换

/** migrate the current thread to another scheduler running* on the specified lcore.*/
int lthread_set_affinity(unsigned lcoreid)
{struct lthread *lt = THIS_LTHREAD;struct lthread_sched *dest_sched;if (unlikely(lcoreid >= LTHREAD_MAX_LCORES))return POSIX_ERRNO(EINVAL);DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);dest_sched = schedcore[lcoreid];if (unlikely(dest_sched == NULL))return POSIX_ERRNO(EINVAL);if (likely(dest_sched != THIS_SCHED)) {lt->sched = dest_sched;lt->pending_wr_queue = dest_sched->pready;//真正切换线程到指定cpu运行的代码_affinitize();return 0;}return 0;
}
tatic __rte_always_inline void
_affinitize(void);
static inline void
_affinitize(void)
{struct lthread *lt = THIS_LTHREAD;DIAG_EVENT(lt, LT_DIAG_LTHREAD_SUSPENDED, 0, 0);ctx_switch(&(THIS_SCHED)->ctx, &lt->ctx);
}
void
ctx_switch(struct ctx *new_ctx __rte_unused, struct ctx *curr_ctx __rte_unused)
{/* SAVE CURRENT CONTEXT */asm volatile (/* Save SP */"mov x3, sp\n""str x3, [x1, #0]\n"/* Save FP and LR */"stp x29, x30, [x1, #8]\n"/* Save Callee Saved Regs x19 - x28 */"stp x19, x20, [x1, #24]\n""stp x21, x22, [x1, #40]\n""stp x23, x24, [x1, #56]\n""stp x25, x26, [x1, #72]\n""stp x27, x28, [x1, #88]\n"/** Save bottom 64-bits of Callee Saved* SIMD Regs v8 - v15*/"stp d8, d9, [x1, #104]\n""stp d10, d11, [x1, #120]\n""stp d12, d13, [x1, #136]\n""stp d14, d15, [x1, #152]\n");/* RESTORE NEW CONTEXT */asm volatile (/* Restore SP */"ldr x3, [x0, #0]\n""mov sp, x3\n"/* Restore FP and LR */"ldp x29, x30, [x0, #8]\n"/* Restore Callee Saved Regs x19 - x28 */"ldp x19, x20, [x0, #24]\n""ldp x21, x22, [x0, #40]\n""ldp x23, x24, [x0, #56]\n""ldp x25, x26, [x0, #72]\n""ldp x27, x28, [x0, #88]\n"/** Restore bottom 64-bits of Callee Saved* SIMD Regs v8 - v15*/"ldp d8, d9, [x0, #104]\n""ldp d10, d11, [x0, #120]\n""ldp d12, d13, [x0, #136]\n""ldp d14, d15, [x0, #152]\n");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/577123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MongoDB数据库本地部署并结合内网穿透实现navicat公网访问

文章目录 前言1. 安装数据库2. 内网穿透2.1 安装cpolar内网穿透2.2 创建隧道映射2.3 测试随机公网地址远程连接 3. 配置固定TCP端口地址3.1 保留一个固定的公网TCP端口地址3.2 配置固定公网TCP端口地址3.3 测试固定地址公网远程访问 前言 MongoDB是一个基于分布式文件存储的数…

Jave EE 网络原理之网络层与数据链路层

文章目录 1. 网络层1.1 IP 协议1.1.1 协议头格式1.1.2 地址管理1.1.2.1 认识 IP 地址 1.1.3 路由选择 2. 数据链路层2.1 认识以太网2.1.1 以太网帧格式2.1.2 DNS 应用层协议 1. 网络层 网络层要做的事情&#xff0c;主要是两个方面 地址管理 &#xff08;制定一系列的规则&am…

解决ELement-UI三级联动数据不回显

目录 一.处理数据时使用this.$set方法来动态地设置实例中的属性&#xff0c;以确保其响应式。 二.检查数据格式是否正确 三.绑定v-if 确保每次执行 四.完整代码 一.处理数据时使用this.$set方法来动态地设置实例中的属性&#xff0c;以确保其响应式。 二.检查数据格式是否正确…

BFS解决FloodFill算法相关leetcode算法题

文章目录 1.图像渲染2.岛屿数量3.岛屿的最大面积4.被围绕的区域 1.图像渲染 图像渲染 class Solution {int dx[4] {0,0,1,-1};int dy[4] {1,-1,0,0}; public:vector<vector<int>> floodFill(vector<vector<int>>& image, int sr, int sc, int…

2023年12月【考试战报】|ORACLE OCP 19C考试通过

2023年10月【考试战报】|ORACLE OCP 19C考试通过-CSDN博客文章浏览阅读122次。自OCP认证进入中国以来&#xff0c;越来越被大多数DBA所认可&#xff0c;也越来越被企业所重视&#xff0c;90%以上DBA深造&#xff0c;都会选择OCP认证。随着OCP认证在全国范围内的普及&#xff0c…

口袋参谋:如何布局买家评论,提高宝贝转化率?

​宝贝转化低&#xff1f; 除了和关键词、主图有关系&#xff0c;与买家评论也有一定的关联&#xff01; 一个差评&#xff0c;足以让宝贝的转化跌落谷底。只要网购&#xff0c;评价&#xff0c;永远是躲不掉的一环。 作为买家&#xff1a; 评价的作用&#xff0c;是用来甄…

Spring之国际化:i18n

学习的最大理由是想摆脱平庸&#xff0c;早一天就多一份人生的精彩&#xff1b;迟一天就多一天平庸的困扰。各位小伙伴&#xff0c;如果您&#xff1a; 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持&#xff0c;想组团高效学习… 想写博客但无从下手&#xff0c;急需…

Jupyter Notebook的安装及在网页端和VScode中使用教程(详细图文教程)

目录 一、Jupyter Notebook1.1 组成组件1.2 优点1.3 常规用途 二、安装及使用2.1 网页端2.1.1 安装Jupyter Notebook2.1.2 检验是否安装成功2.1.3 启动Jupyter Notebook2.1.4 使用Jupyter Notebook 2.2 VScode中安装及使用2.2.1 安装Jupyter2.2.2 使用Jupyter 三、常用命令3.1 …

PaddleOCR 的使用,极简介绍

安装 参考github的官网就可以&#xff1a; github链接 简单的说&#xff0c;就是两句话&#xff1a; python3 -m pip install paddlepaddle-gpu -i https://mirror.baidu.com/pypi/simple pip install "paddleocr>2.0.1" # 推荐使用2.0.1版本 Python下的使用…

[环境配置]win10关闭病毒和威胁防护防止乱删软件

搜索栏输入病毒和威胁即可看到 如果没有搜到您可以从菜单栏进到到Windows设置 选择更新和安全 点击后进到windows安全中心&#xff0c;随后进到到病毒和威胁防护 关闭所有选项

软件测试必问的33个面试题

1.你为什么选择软件测试行业 因为之前有了解软件测试这个行业&#xff0c;觉得他的发展前景很好。 2.根据你以前的工作经验描述一下软件开发、测试过程&#xff0c;由那些角色负责&#xff0c;你做什么 要有架构师、开发经理、测试经理、程序员、测试员。我在里面主要是负责所…

Git 分布式版本控制系统(序章1)

第一章 Git 分布式版本控制系统 为什么学Git? 某些企业面试需要掌握Git&#xff0c;同时&#xff0c;也方便管理自己的Qt项目。 一、Git 客户端下载&#xff08;Windows&#xff09; 下载地址 https://gitee.com/all-about-git#git-%E5%A4%A7%E5%85%A8 二、Git 的特点 分支…

自动驾驶中的“雷达”

自动驾驶中有好几种雷达&#xff0c;新手可能会蒙蔽&#xff0c;这里统一介绍一下它们。 首先&#xff0c;所有雷达的原理都是发射波&#xff0c;接收回波&#xff0c;并通过发射和接收的时间差以及波的速度计算距离。只不过发射的波不同&#xff0c;功能也不同。 激光雷达 …

unity HoloLens2开发,使用Vuforia识别实体 触发交互(二)(有dome)

提示&#xff1a;文章有错误的地方&#xff0c;还望诸位大神不吝指教&#xff01; 文章目录 前言一、打包到HoloLens二、Vuforia相关1.配置识别框2.制作一个半透明识别框&#xff1a;3.设置如下4.问题 四 HoloLens2 问题总结 前言 我使用的utniy 版本&#xff1a;Unity 2021.3…

广州华锐互动VRAR:VR安全模拟驾驶让顾客身临其境感受真实试驾体验

随着科技的不断发展&#xff0c;汽车行业也在不断地进行创新。从电动汽车到自动驾驶&#xff0c;再到如今的虚拟现实技术&#xff0c;汽车行业的未来充满了无限的可能性。而在这些创新中&#xff0c;VR安全模拟驾驶无疑是最具吸引力的一项。通过戴上一副虚拟现实眼镜&#xff0…

本机ping不通虚拟机

windows下finall shell连不上虚拟机了&#xff0c;之前是可以的&#xff0c;然后ping虚拟机&#xff0c;发现也ping不通&#xff0c;随后到处找问题。 在本地部分&#xff0c;控制面板 ——>网络和Internet——>网络连接 &#xff0c; 可以看到 VMnet1和Vmnet8虽然都是已…

取证练习(一)PC+手机,服务器未完

链接&#xff1a;https://pan.baidu.com/s/1KlkPwzWm7dNO2iRGoTsE7Q?pwdxyxy 提取码&#xff1a;xyxy –来自百度网盘超级会员V3的分享 每道题5分&#xff0c;共计200 一、请检查窝点中的手机检材&#xff0c;回答以下问题 1、 该OPPO手机的IMEI是&#xff1a; A. 8603700…

Python爬虫的作用及工具和反爬机制,爬虫新手入门篇

文章目录 一 什么是爬虫二 爬虫工具三. 反爬虫问题Python爬虫技术资源分享1、Python所有方向的学习路线2、学习软件3、入门学习视频4、实战案例5、清华编程大佬出品《漫画看学Python》6、Python副业兼职与全职路线 一 什么是爬虫 爬虫能做什么 政治角逐 2016年这场美国总统竞…

选择激光打标机:为您的产品增添独特标识

激光打标机是一种高科技的打标设备&#xff0c;以其独特的标记效果和精度&#xff0c;逐渐成为了工业制造领域中不可或缺的一部分。选择激光打标机&#xff0c;不仅可以为您的产品增添独特的标识&#xff0c;还可以提升品牌形象&#xff0c;增强产品附加值。 一、激光打标机的独…