奥格尔巧妙kfifo
Author:Echo Chen(陈斌)
Email:chenb19870707@gmail.com
Date:October 8th, 2014
学不考儒,务掇精华。文不按古,匠心独运。Linux kernal 鬼斧神工,博大精深。让人叹为观止。拍手叫绝。然匠心独运的设计并不是扑朔迷离、盘根错节。真正的匠心独运乃辞简理博、化繁为简,在简洁中昭显优雅和智慧。kfifo就是这样一种数据结构,它就是这样简约高效,匠心独运,妙不可言。以下就跟大家一起探讨学习。
本文分析的原代码版本号 | 2.6.32.63 |
kfifo的头文件 | include/linux/kfifo.h |
kfifo的源文件 | kernel/kfifo.c |
kfifo是一种"First In First Out “数据结构。它採用了前面提到的环形缓冲区来实现,提供一个无边界的字节流服务。
採用环形缓冲区的优点为,当一个数据元素被用掉后。其余数据元素不须要移动其存储位置,从而降低拷贝提高效率。
更重要的是,kfifo採用了并行无锁技术。kfifo实现的单生产/单消费模式的共享队列是不须要加锁同步的。
1: struct kfifo {
2: unsigned char *buffer; /* the buffer holding the data */
3: unsigned int size; /* the size of the allocated buffer */
4: unsigned int in; /* data is added at offset (in % size) */
5: unsigned int out; /* data is extracted from off. (out % size) */
6: spinlock_t *lock; /* protects concurrent modifications */
7: };
buffer | 用于存放数据的缓存 |
size | 缓冲区空间的大小。在初化时,将它向上圆整成2的幂 |
in | 指向buffer中队头 |
out | 指向buffer中的队尾 |
lock | 假设使用不能保证不论什么时间最多仅仅有一个读线程和写线程,必须使用该lock实施同步。 |
它的结构如图:
这看起来与普通的环形缓冲区没有什么区别,可是让人叹为观止的地方就是它巧妙的用 in 和 out 的关系和特性,处理各种操作,以下我们来具体分析。
首先,看一个非常有趣的函数。推断一个数是否为2的次幂,依照一般的思路。求一个数n是否为2的次幂的方法为看 n % 2 是否等于0, 我们知道“取模运算”的效率并没有 “位运算” 的效率高,有兴趣的同学能够自己做下实验。
以下再验证一下这样取2的模的正确性,若n为2的次幂,则n和n-1的二进制各个位肯定不同 (如8(1000)和7(0111))。&出来的结果肯定是0。假设n不为2的次幂,则各个位肯定有同样的 (如7(0111) 和6(0110)),&出来结果肯定为0。是不是非常巧妙?
1: bool is_power_of_2(unsigned long n)
2: {
3: return (n != 0 && ((n & (n - 1)) == 0));
4: }
再看下kfifo内存分配和初始化的代码。前面提到kfifo总是对size进行2次幂的圆整,这种优点不言而喻,能够将kfifo->size取模运算能够转化为与运算。例如以下:
kfifo->in % kfifo->size 能够转化为 kfifo->in & (kfifo->size – 1)
“取模运算”的效率并没有 “位运算” 的效率高还记得不,不放过不论什么一点能够提高效率的地方。
1: struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
2: {
3: unsigned char *buffer;
4: struct kfifo *ret;
5:
6: /*
7: * round up to the next power of 2, since our ‘let the indices
8: * wrap‘ technique works only in this case.
9: */
10: if (!is_power_of_2(size)) {
11: BUG_ON(size > 0x80000000);
12: size = roundup_pow_of_two(size);
13: }
14:
15: buffer = kmalloc(size, gfp_mask);
16: if (!buffer)
17: return ERR_PTR(-ENOMEM);
18:
19: ret = kfifo_init(buffer, size, gfp_mask, lock);
20:
21: if (IS_ERR(ret))
22: kfree(buffer);
23:
24: return ret;
25: }
为什么kfifo实现的单生产/单消费模式的共享队列是不须要加锁同步的呢?天底下没有免费的午餐的道理人人都懂,以下我们就来看看kfifo实现并发无锁的奥秘。
我们知道 编译器编译源码时。会将源码进行优化,将源码的指令进行重排序。以适合于CPU的并行运行。然而,内核同步必须避免指令又一次排序,优化屏障(Optimization barrier)避免编译器的重排序优化操作。保证编译程序时在优化屏障之前的指令不会在优化屏障之后运行。
举个样例,假设多核CPU运行下面程序:
1: a = 1;
2: b = a + 1;
3: assert(b == 2);
如果初始时a和b的值都是0。a处于CPU1-cache中,b处于CPU0-cache中。如果依照以下流程运行这段代码:
1 CPU0运行a=1; |
软件可通过读写屏障强制内存訪问次序。读写屏障像一堵墙,全部在设置读写屏障之前发起的内存訪问,必须先于在设置屏障之后发起的内存訪问之前完毕,确保内存訪问按程序的顺序完毕。Linux内核提供的内存屏障API函数说明例如以下表。内存屏障可用于多处理器和单处理器系统,假设仅用于多处理器系统。就使用smp_xxx函数。在单处理器系统上,它们什么都不要。
smp_rmb |
适用于多处理器的读内存屏障。 |
smp_wmb |
适用于多处理器的写内存屏障。 |
smp_mb |
适用于多处理器的内存屏障。 |
假设对上述代码加上内存屏障,就能保证在CPU0取a时。一定已经设置好了a = 1:
1: void foo(void)
2: {
3: a = 1;
4: smp_wmb();
5: b = a + 1;
6: }
__kfifo_put是入队操作,它先将数据放入buffer中,然后移动in的位置,其源码例如以下:
1: unsigned int __kfifo_put(struct kfifo *fifo,
2: const unsigned char *buffer, unsigned int len)
3: {
4: unsigned int l;
5:
6: len = min(len, fifo->size - fifo->in + fifo->out);
7:
8: /*
9: * Ensure that we sample the fifo->out index -before- we
10: * start putting bytes into the kfifo.
11: */
12:
13: smp_mb();
14:
15: /* first put the data starting from fifo->in to buffer end */
16: l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
17: memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
18:
19: /* then put the rest (if any) at the beginning of the buffer */
20: memcpy(fifo->buffer, buffer + l, len - l);
21:
22: /*
23: * Ensure that we add the bytes to the kfifo -before-
24: * we update the fifo->in index.
25: */
26:
27: smp_wmb();
28:
29: fifo->in += len;
30:
31: return len;
32: }
1: unsigned int __kfifo_get(struct kfifo *fifo,
2: unsigned char *buffer, unsigned int len)
3: {
4: unsigned int l;
5:
6: len = min(len, fifo->in - fifo->out);
7:
8: /*
9: * Ensure that we sample the fifo->in index -before- we
10: * start removing bytes from the kfifo.
11: */
12:
13: smp_rmb();
14:
15: /* first get the data from fifo->out until the end of the buffer */
16: l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
17: memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
18:
19: /* then get the rest (if any) from the beginning of the buffer */
20: memcpy(buffer + l, fifo->buffer, len - l);
21:
22: /*
23: * Ensure that we remove the bytes from the kfifo -before-
24: * we update the fifo->out index.
25: */
26:
27: smp_mb();
28:
29: fifo->out += len;
30:
31: return len;
32: }
6行,可去读的长度为fifo->in – fifo->out,让读的长度取len和剩余容量中较小的,避免读越界;
13行,加读内存屏障。保证在開始取数据之前。fifo->in取到正确的值(还有一个CPU可能正在改写in值)
16行。前面讲到fifo->size已经2的次幂圆整,并且kfifo->out % kfifo->size 能够转化为 kfifo->out & (kfifo->size – 1)。所以fifo->size - (fifo->out & (fifo->size - 1)) 即位 fifo->out 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值。即为从fifo->buffer + fifo->in到末尾所要去读的长度。
kfifo设计静止,妙不可言,但主要为内核提供服务,内存屏障函数也主要为内核提供服务,并未开放出来,可是我们学习到了这样的设计巧妙之处。就能够依葫芦画瓢,写出自己的并发无锁环形缓冲区,这将在下篇文章中给出,至于内存屏障函数的问题,好在gcc 4.2以上的版本号都内置提供__sync_synchronize()这类的函数,效果相差点儿相同。
《眉目传情之并发无锁环形队列的实现》给出自己的并发无锁的实现,有兴趣的朋友能够參考一下。
1.http://blog.csdn.net/xujianqun/article/details/7800813
2.http://zh.wikipedia.org/wiki/%E7%92%B0%E5%BD%A2%E7%B7%A9%E8%A1%9D%E5%8D%80#.E7.94.A8.E6.B3.95
3.http://blog.csdn.net/linyt/article/details/5764312
-
Echo Chen:Blog.csdn.net/chen19870707
-
版权声明:本文博主原创文章,博客,未经同意不得转载。
原文:http://www.cnblogs.com/gcczhongduan/p/4804159.html