一、先说FIFO
队列是常见的一种数据结构,简单看来就是一段数据缓存区,可以存储一定量的数据,先存进来的数据会被先取出,First In Fist Out,就是FIFO。
FIFO主要用于缓冲速度不匹配的通信。
例如生产者(数据产生者)可能在短时间内生成大量数据,导致消费者(数据使用方)无法立即处理完,那么就需要用到队列。生产者可以突然生成大量数据存到队列中,然后就去休息,消费者再有条不紊地将数据一条条取出解析。
再具体点的例子,通信接口驱动接收到通信数据时,需要将其存入队列,然后马上再回去接收或等待新数据,相关的通信解析程序只需要从队列中取数据即可。如果驱动每次接收到数据都要等待解析,则有可能导致新数据没能及时接收而丢失。
除了缓冲速度不匹配的通信外,FIFO也是“多生产者-单消费者”场景的一种解决方案。
二、内核的kfifo 特别的地方
kfifo是linux内核的对队列功能的实现。在内核中,它被称为无锁环形队列。
所谓无锁,就是当只有一个生产者和只有一个消费者时,操作fifo不需要加锁。这是因为kfifo出队和入队时,不会改动到相同的变量。
例如,如果让我们自己实现一个fifo,大家容易想到使用一个count来记录fifo中的数据量,入队一个则加一个,出队一个则减一个:
这种情况肯定是需要对count加锁的。那kfifo是怎么做的呢?
kfifo使用了in和out两个变量分别作为入队和出队的索引:
- 入队n个数据时,in变量就+n
- 出队k个数据时,out变量就+k
- out不允许大于in(out等于in时表示fifo为空)
- in不允许比out大超过fifo空间(比如上图,in最多比out多8,此时表示fifo已满)
如果in和out大于fifo空间了,比如上图中的8,会减去8后重新开始吗?
不,这两个索引会一直往前加,不轻易回头,为出入队操作省下了几个指令周期。
那入队和出队的数据从哪里开始存储/读取呢,我们第一时间会想到,把 in/out 用“%”对fifo大小取余就行了,是吧?
不,取余这种耗费资源的运算,内核开发者怎会轻易采用呢,kfifo的办法是,把 in/out 与上fifo->mask。这个mask等于fifo的空间大小减一(其要求fifo的空间必须是2的次方大小)。这个“与”操作可比取余操作快得多了。
由此,kfifo就实现了“无锁”“环形”队列。
了解了上述原理,我们就能意识到,这个无锁只是针对“单生产者-单消费者”而言的。“多生产者”时,则需要对入队操作进行加锁;同样的,“多消费者”时需要对出队操作进行加锁。
特别的地方:它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场情时,两个线程可以并发操作,而不需要任何加锁行为,就可以保证kfifo的线程安全
三、kfifo的实现
struct kfifo {unsigned char *buffer; /* the buffer holding the data */unsigned int size; /* the size of the allocated buffer */unsigned int in; /* data is added at offset (in % size) */unsigned int out; /* data is extracted from off. (out % size) */spinlock_t *lock; /* protects concurrent modifications */
};
这是kfifo的数据结构,kfifo主要提供了两个操作,__kfifo_put(入队操作)和__kfifo_get(出队操作)。 它的各个数据成员如下:
buffer: 用于存放数据的缓存
size: buffer空间的大小,在初化时,将它向上扩展成2的幂
lock: 如果使用不能保证任何时间最多只有一个读线程和写线程,需要使用该lock实施同步。
in, out: 和buffer一起构成一个循环队列。 in指向buffer中队头,而且out指向buffer中的队尾,它的结构如示图如下:
3.2 kfifo 的功能
- 只支持一个读者和一个读者并发操作
- 无阻塞的读写操作,如果空间不够,则返回实际访问空间
kfifo_alloc 分配kfifo内存和初始化工作:
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{unsigned char *buffer;struct kfifo *ret;/** round up to the next power of 2, since our 'let the indices* wrap' tachnique works only in this case.*/if (size & (size - 1)) {BUG_ON(size > 0x80000000);size = roundup_pow_of_two(size);}buffer = kmalloc(size, gfp_mask);if (!buffer)return ERR_PTR(-ENOMEM);ret = kfifo_init(buffer, size, gfp_mask, lock);if (IS_ERR(ret))kfree(buffer);return ret;
}
1.判断一个数是不是2的次幂 :
kfifo要保证其缓存空间的大小为2的次幂,如果不是则向上取整为2的次幂。其对于2的次幂的判断方式也是很巧妙的。如果一个整数n是2的次幂,则二进制模式必然是1000…,而n-1的二进制模式则是0111…,也就是说n和n-1的每个二进制位都不相同,例如:8(1000)和7(0111);n不是2的次幂,则n和n-1的二进制必然有相同的位都为1的情况,例如:7(0111)和6(0110)。这样就可以根据 n & (n-1)的结果来判断整数n是不是2的次幂,实现如下:
static inline bool is_power_of_2(uint32_t n)
{return (n != 0 && ((n & (n - 1)) == 0));
}
2.将数字向上取整为2的次幂 :
如果设定的缓冲区大小不是2的次幂,则向上取整为2的次幂,例如:设定为5,则向上取为8。上面提到整数n是2的次幂,则其二进制模式为100…,故如果正数k不是n的次幂,只需找到其最高的有效位1所在的位置(从1开始计数)pos,然后1 << pos即可将k向上取整为2的次幂。实现如下:
static inline uint32_t roundup_power_of_2(uint32_t a)
{if (a == 0)return 0;uint32_t position = 0;for (int i = a; i != 0; i >>= 1)position++;return static_cast<uint32_t>(1 << position);
}
fifo->in & (fifo->size - 1) 再比如这种写法取模,获取已用的大小。这样用逻辑与的方式相较于加减法更有效率
3.3 _kfifo_put与__kfifo_get详解
__kfifo_put是入队操作,它先将数据放入buffer里面,最后才修改in参数;
__kfifo_get是出队操作,它先将数据从buffer中移走,最后才修改out。
unsigned int __kfifo_put(struct kfifo *fifo,unsigned char *buffer, unsigned int len)
{unsigned int l;len = min(len, fifo->size - fifo->in + fifo->out);/** Ensure that we sample the fifo->out index -before- we* start putting bytes into the kfifo.*/smp_mb();/* first put the data starting from fifo->in to buffer end */l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);/* then put the rest (if any) at the beginning of the buffer */memcpy(fifo->buffer, buffer + l, len - l);/** Ensure that we add the bytes to the kfifo -before-* we update the fifo->in index.*/smp_wmb();fifo->in += len;return len;
}
6行,环形缓冲区的剩余容量为fifo->size - fifo->in + fifo->out,让写入的长度取len和剩余容量中较小的,避免写越界;
13行,加内存屏障,保证在开始放入数据之前,fifo->out取到正确的值(另一个CPU可能正在改写out值)
16行,前面讲到fifo->size已经2的次幂圆整,而且kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1),所以fifo->size - (fifo->in & (fifo->size - 1)) 即位 fifo->in 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值,即为需要拷贝l 字节到fifo->buffer + fifo->in的位置上。
17行,拷贝l 字节到fifo->buffer + fifo->in的位置上,如果l = len,则已拷贝完成,第20行len – l 为0,将不执行,如果l = fifo->size - (fifo->in & (fifo->size - 1)) ,则第20行还需要把剩下的 len – l 长度拷贝到buffer的头部。
27行,加写内存屏障,保证in 加之前,memcpy的字节已经全部写入buffer,如果不加内存屏障,可能数据还没写完,另一个CPU就来读数据,读到的缓冲区内的数据不完全,因为读数据是通过 in – out 来判断的。
29行,注意这里 只是用了 fifo->in += len而未取模,这就是kfifo的设计精妙之处,这里用到了unsigned int的溢出性质,当in 持续增加到溢出时又会被置为0,这样就节省了每次in向前增加都要取模的性能,锱铢必较,精益求精,让人不得不佩服。
unsigned int __kfifo_get(struct kfifo *fifo,unsigned char *buffer, unsigned int len)
{unsigned int l;len = min(len, fifo->in - fifo->out);/** Ensure that we sample the fifo->in index -before- we* start removing bytes from the kfifo.*/smp_rmb();/* first get the data from fifo->out until the end of the buffer */l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);/* then get the rest (if any) from the beginning of the buffer */memcpy(buffer + l, fifo->buffer, len - l);/** Ensure that we remove the bytes from the kfifo -before-* we update the fifo->out index.*/smp_mb();fifo->out += len;return len;
}
6行,可去读的长度为fifo->in – fifo->out,让读的长度取len和剩余容量中较小的,避免读越界;
13行,加读内存屏障,保证在开始取数据之前,fifo->in取到正确的值(另一个CPU可能正在改写in值)
16行,前面讲到fifo->size已经2的次幂圆整,而且kfifo->out % kfifo->size 可以转化为 kfifo->out & (kfifo->size – 1),所以fifo->size - (fifo->out & (fifo->size - 1)) 即位 fifo->out 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值,即为从fifo->buffer + fifo->in到末尾所要去读的长度。
17行,从fifo->buffer + fifo->out的位置开始读取l长度,如果l = len,则已读取完成,第20行len – l 为0,将不执行,如果l =fifo->size - (fifo->out & (fifo->size - 1)) ,则第20行还需从buffer头部读取 len – l 长。
27行,加内存屏障,保证在修改out前,已经从buffer中取走了数据,如果不加屏障,可能先执行了增加out的操作,数据还没取完,令一个CPU可能已经往buffer写数据,将数据破坏,因为写数据是通过fifo->size - (fifo->in & (fifo->size - 1))来判断的 。
29行,注意这里 只是用了 fifo->out += len 也未取模,同样unsigned int的溢出性质,当out 持续增加到溢出时又会被置为0,如果in先溢出,出现 in < out 的情况,那么 in – out 为负数(又将溢出),in – out 的值还是为buffer中数据的长度。
图解一下 in 先溢出的情况,size = 64, 写入前 in = 4294967291, out = 4294967279 ,数据 in – out = 12;
写入 数据16个字节,则 in + 16 = 4294967307,溢出为 11,此时 in – out = –4294967268,溢出为28,数据长度仍然正确,由此可见,在这种特殊情况下,这种计算仍然正确,是不是让人叹为观止,妙不可言?
3.4 kfifo_get和kfifo_put无锁并发操作
计算机科学家已经证明,当只有一个读经程和一个写线程并发操作
时,不需要任何额外的锁,就可以确保是线程安全的,也即kfifo使用了无锁编程技术,以提高kernel的并发。
为了避免读者看到写者预计写入,但实际没有写入数据的空间,写者必须保证以下的写入顺序:
1.往[kfifo->in, kfifo->in + len]空间写入数据
2.更新kfifo->in指针为 kfifo->in + len
在操作1完成时,读者是还没有看到写入的信息的,因为kfifo->in没有变化,认为读者还没有开始写操作,只有更新kfifo->in之后,读者才能看到。
那么如何保证1必须在2之前完成,秘密就是使用内存屏障:smp_mb(),smp_rmb(), smp_wmb(),来保证对方观察到的内存操作顺序。
3.5 优点
kfifo优点:
实现单消费者和单生产者的无锁并发访问。多消费者和多生产者的时候还是需要加锁的。
使用与运算in & (size-1)代替模运算
在更新in或者out的值时不做模运算,而是让其自动溢出。这应该是kfifo实现最牛叉的地方了,利用溢出后的值参与运算,并且能够保证结果的正确。溢出运算保证了以下几点:
in - out为缓冲区中的数据长度
size - in + out 为缓冲区中空闲空间
in == out时缓冲区为空
size == (in - out)时缓冲区满了
读完kfifo代码,令我想起那首诗“众里寻他千百度,默然回首,那人正在灯火阑珊处”。不知你是否和我一样,总想追求简洁,高质量和可读性的代码,当用尽各种方法,江郞才尽之时,才发现Linux kernel里面的代码就是我们寻找和学习的对象。