电子说
《Memory Ordering in Modern Microprocessors》该文章和上一篇是同一个作者。该文章对上一篇中第6部分的内容进行了更加详细的说明。3.Java volatile在刚开始学习volatile和内存屏障的时候,在网上搜到很多的资料都是讲java实现的。volatile这个关键字在java和 CC++ 里面有非常大的区别,容易引起误会。主要区别在于,java volatile 具有缓存同步的功能,而 CC++ 没有这个功能,具体原因本文会简单讲下。详细内容参见B站马士兵老师的课程。 4.无锁队列实践理论结合实践,关于无锁队列的实现有几篇文章值得一读:
//disorder_test.c #include
QUEUENODE
定义了一个具体的商品。其中有两个变量,m_flag
用于标识队列中对应位置是否存在商品,m_flag
为 1 表示生产者已经生产了商品,m_flag
为 0 表示商品还未被生产。m_data
表示商品具体的值。m_queue
为一个全局的循环队列。Push
函数向队列中放入商品,在 push
前首先判断指定位置是否存在商品,如果存在则等待(通过 while
自旋来实现),否则首先放入商品(为 m_data
赋值),再设置 m_flag
为 1。Pop
函数用于从队列中获取商品,pop
之前先判断指定位置是否存在商品,如果不存在则等待(通过while自旋来实现),否则首先取出商品(将 m_data
赋值给 goods
),再设置 m_flag
为 0。main
函数是一个死循环,每次开启两个线程,一个线程向队列中 push
商品,一个线程从队列中 pop
线程,然后等待两个线程结束,最后打印出通过 pop
获取到的商品的值,即 goods
。OK,现在用非优化编译编译该代码,并运行:gcc disorder_test.c -o disorder_test -lpthread ./disorder_test OK,看起来一切正常。现在我们换成优化编译试试:
gcc disorder_test.c -O2 -o disorder_test -lpthread ./disorder_test img程序陷入了死循环…发生了什么?现在我们来看看这段代码的汇编,首先是非优化编译版本:
gcc -S disorder_test.c cat disorder_test.s img这里我们只标注出最关键的部分,即 push 中的 while 循环。我们注意到,while 中每次循环都会执行取值和运算操作,然后才执行 testl 判断。我们再来看看优化版本。
gcc -S -O2 disorder_test.c cat disorder_test.s img这里就非常可怕了,可以看到
.L4
本身就是一个死循环,前面 testl 之后如果发现不满足条件,则直接跳进死循环。这是为什么?我们来看看 push
的代码:void* Push(void* param) { long long data = *(long long*)param; int pos = data % QUEUE_LEN; while (m_queue[pos].m_flag) ; m_queue[pos].m_data = data; m_queue[pos].m_flag = 1; return NULL; } while循环会检测m_queue[pos].m_flag,而在这个函数中,只有当m_queue[pos].m_flag为0时,循环才会跳出,执行line7及之后的代码,而在line8才会对m_flag进行修改。所以编译器认为在循环的过程中,没人会修改m_flag。既然没有修改m_flag,只要m_flag一开始的值不为0,那么m_flag就是一个不会改变的值,当然就是死循环!显然编译器并不知道另一个线程会执行pop函数,而pop会修改m_flag的值。如果观察pop的汇编代码也会发现完全相同的优化逻辑。所以,在这种情况下,就需要程序员显式的告诉编译器,m_flag是一个会发生改变的值,所以不要尝试做这样的优化。这就是volatile关键字。现在我们给m_flag加上volatile关键字:
typedef struct { volatile int m_flag; long long m_data; }QUEUENODE, LPQUEUENODE; 再次优化编译并运行程序:
gcc disorder_test.c -O2 -o disorder_test -lpthread ./disorder_test OK,一切正常!现在我们再来看看汇编代码:现在每次循环都会执行movl指令去获取m_flag的值!一切都变得美好了。
//test.c int x,y,r; void f() { x = r; y = 1; } void main() { f(); } 这次,我们直接对比非优化编译与优化编译的汇编代码。
x=
r 和 y=1
的顺序,先将 y 的值赋值为 1,再将 x 值赋值为 r。现在我们将 x,y, r 加上 volatile
关键字。volatile int x,y,r; 再次查看汇编代码:指令顺序和代码顺序一致。在 https://www.runoob.com/w3cnote/c-volatile-keyword.html 介绍 volatile 时有这样一段描述 “当使用 volatile 声明的变量的值的时候,系统总是重新从它所在的内存读取数据,即使它前面的指令刚刚从该处读取过数据”。然而,实际情况真的是每次都从内存中读取数据么?其实这只是一个笼统的说法,更为准确的说法应该是,系统不会直接从寄存器中读取 volatile 修饰的变量。因为,寄存器的读写性能远高于内存,所以在CPU寄存器和内存之前,通常有多级高速缓存。相信大家都见过这样一张著名的图,不难发现,图中,在内存与寄存器之间,存在 L1、L2、L3 这样三级缓存。所以指令在进行访存操作的时候,会首先逐级查看缓存中是否有对应的数据,如果3级缓存有没有期望的数据,才会访问内存。而通常在多核CPU中缓存是如下图所示的这样一种结构:每个 CPU core 都有自己独立的 L1 和 L2 缓存,多个 core 共享一个L3缓存,多个 CPU 有各自的 L3 缓存,多个CPU 共享内存。每个 core 都有自己独立的 L1 和 L2 缓存,缓存可以独立读写!这个就可怕了,因为这就存在不同 core 读写同一份数据的可能,如果不加任何限制,岂不天下大乱了?所以对于多核 CPU,需要一种机制来对缓存中的数据进行同步。这也就是我们接下来要讲的
MESI
。在《Memory Barriers: a Hardware View for Software Hackers》还有一个操作叫 write back(写回),是指将Cache数据写回内存。在 CSAPP 中,第4章讲到指令的6个阶段其中也有一个阶段叫write back,这里是指将执行阶段的结果写回到寄存器,这两个概念不要混淆了。MESI 是指缓存行的四种状态:I:invalid,最简单的一种状态,表示该缓存行没有数据,显然这也是缓存行的初始状态。S:shared,该缓存行中的数据被其他CPU共享。在shared状态下,缓存行为只读,不可以修改。E:exclusive,该缓存行中的数据没有被其他CPU共享,且缓存中的数据与内存中保持一致。在exclusive状态下,缓存行可以修改。M:modified,该缓存行保存了唯一一份 up-to-date 的数据。即该缓存行中的数据没有被其他CPU共享,且缓存行的数据与内存不一致。这四种状态之间是可以互相转换的,具体的转换方式在《Memory Barriers: a Hardware View for Software Hackers》一文中也有非常详细的描述(重要的是事情说三遍,这篇文章很重要!!!)。这里我们只对部分状态转换加以说明。
a = 1; b = a + 1; assert(b == 2); 如上面代码所示。首先 line2 的加法运算要使用到 line1 中的变量a,所以两行代码是存在数据相关性的,那么编译器不会尝试交换指令顺序。我们假设现在变量 a 在 CPU1 中,变量 b 在 CPU0 中,且初始值均为0。假设现在 CPU0 要执行上述代码,根据前面 MESI 的规定,上述代码的执行顺序如下:
a = 1;
这行代码不难发现,不论 CPU1 回传给 CPU0 的值是什么,我们会将 a 的值最终修改为1,那么我们真正需要等待的只是 invalidate acknowledge。那么我们是不是可以先将 a = 1;
这条指令缓存起来,继续执行后面的操作,等收到 invalidate acknowledge 之后再来真正修改 a 的值呢?答案是肯定的,如下图所示:a=1
时,如果需要等待 invalidate acknowledge,那么就先将 a=1
写入这个 store buffer
,然后继续执行后面的代码,等到收到 invalidate acknowledge 再将 store buffer 中的值写入缓存。好了,那么现在问题来了。有了store buffer之后,前面代码就可以是这样的一种执行顺序。void foo(void) { a = 1; b = 1; } void bar(void) { while (b == 0) continue; assert(a == 1); } 假设,a,b初始值为0。a 在CPU1中且为 exclusive 状态,b 在 CPU0 中且为 exclusive 状态,CPU0 执行 foo(),CPU1 执行 bar()。情况如下:
assert(a == 1);
a=1;
还没有被所有CPU的可见的时候,b=1;
已经被所有CPU都可见了。而 a=1
不可见的原因是 store buffer 中的数据还没有应用到缓存行中。解决这个问题可以有两种思路:void foo(void) { a = 1; smp_mb(); //内存屏障 b = 1; } void bar(void) { while (b == 0) continue; assert(a == 1); } 按照思路1,CPU0 执行到 line4 时,发现 store buffer 中有 a=1,于是暂停执行,直到 store buffer 中的数据应用到cache中,再继续执行 b=1。这样便没问题了。按照思路2,CPU0 执行到 line4 时,发现 store buffer中有 a=1,于是将该条目做一个标记(标记store buffer中的所有当前条目)。在执行b=1时,发现store buffer中有一个带标记的条目,于是将b=1也写入store buffer,这样b=1对于CPU1也就不可见了。只有当代标记的条目应用于缓存之后,后续条目才可以应用于缓存。这相当于只有当标记条目都应用于缓存后,后续的store操作才能进行。通过这两种方式就很好的解决了缓存可见性问题。仔细观察这个流程,其实感觉有点数据库事务的意思,哈哈,技术果然都是互通的。不难发现,内存屏障限制了CPU的执行流程,所以同样会有一定的性能损失,但是显然不满足正确性任何性能都是扯淡。
void foo(void) { a = 1; smp_mb(); //内存屏障 b = 1; } void bar(void) { while (b == 0) continue; assert(a == 1); } 假设,a,b初始值为0。a在CPU0和CPU1之前共享,状态为shared,b在CPU0中且为exclusive状态,CPU0执行foo(),CPU1执行bar()。情况如下:
void foo(void) { a = 1; smp_mb(); //内存屏障 b = 1; } void bar(void) { while (b == 0) continue; smp_mb(); //内存屏障 assert(a == 1); } 使用内存屏障后,会标记store buffer中的所有当前条目,只有当所有标记的条目都应用于缓存后,后续的load操作才能进行。
When a given CPU executes a memory barrier, it marks all the entries currently in its invalidate queue, and forces any subsequent load to wait until all marked entries have been applied to the CPU’s cache.所以在加上内存屏障之后,在执行 assert(a == 1)之前需要先将invalidate queue中的条目应用于缓存行。所以在执行
a== 1
时,CPU1 会发现 a 不在 CPU1 的缓存,从而给 CPU0 发送read消息,获得 a 的值1,最终assert(a == 1); 成功。其实在这里内存屏障还有一个非常重要的作用,因为a==1并不一定要等 b != 0时才会执行。这又是为什么?while (b == 0) continue;是一个条件循环,条件循环的本质是条件分支+无条件循环(IF+LOOP)。在执行条件分支时,为了更好的利用指令流水,有一种被称作分支预测的机制。所以实际执行的时候可能会假定条件分支的值为FALSE,从而提前执行 assert(a == 1);关于while循环和指令流水可以参见CSAPP的第三、第四章。
void foo(void) { a = 1; smp_wmb(); //写屏障 b = 1; } void bar(void) { while (b == 0) continue; smp_rmb(); //读屏障 assert(a == 1); }
当然,对于单线程开发和单核CPU也不用担心内存屏障的问题。补充:锁是如何实现的通常情况下,锁都是基于一种叫做CAS(compare-and-swap)的操作实现的。CAS的代码如下:
static __inline__ int tas(volatile slock_t *lock) { register slock_t _res = 1; __asm__ __volatile__( " lock " " xchgb %0,%1 " : "+q"(_res), "+m"(*lock) : /* no inputs */ : "memory", "cc"); return (int) _res; } 其中:xchgb 就是实现 CAS 的指令,而在 xchgb 之前有一个 lock 前缀,这个前缀的作用是锁总线,达到的效果就是内存屏障的效果。这也就是为什么使用了锁就不用担心内存屏障的问题了。而 JAVA 对于内存屏障的底层实现其实就是用的这个lock。
__kfifo_put()
函数和 __kfifo_get()
。__kfifo_put()
用于向队列中写入数据,__kfifo_get()
用于从队列中获取数据。/** * __kfifo_put - puts some data into the FIFO, no locking version * @fifo: the fifo to be used. * @buffer: the data to be added. * @len: the length of the data to be added. * * This function copies at most @len bytes from the @buffer into * the FIFO depending on the free space, and returns the number of * bytes copied. * * Note that with only one concurrent reader and one concurrent * writer, you don't need extra locking to use these functions. */ unsigned int __kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned int l; len = min(len, fifo->size - fifo->in + fifo->out); /* * Ensure that we sample the fifo->out index -before- we * start putting bytes into the kfifo. * line19是读操作,line30之后是写操作(向队列中写数据),所以需要使用全屏障(隔离读和写)。 */ smp_mb(); /* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l); /* * Ensure that we add the bytes to the kfifo -before- * we update the fifo->in index. * line34是写操作,line44也是写操作,所以使用写屏障(隔离写和写)。 */ smp_wmb(); fifo->in += len; return len; } EXPORT_SYMBOL(__kfifo_put);
/** * __kfifo_get - gets some data from the FIFO, no locking version * @fifo: the fifo to be used. * @buffer: where the data must be copied. * @len: the size of the destination buffer. * * This function copies at most @len bytes from the FIFO into the * @buffer and returns the number of copied bytes. * * Note that with only one concurrent reader and one concurrent * writer, you don't need extra locking to use these functions. */ unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned int l; len = min(len, fifo->in - fifo->out); /* * Ensure that we sample the fifo->in index -before- we * start removing bytes from the kfifo. * line18读操作,line29是读操作(从队列中读数据),所以需要使用读屏障(隔离读和读)。 */ smp_rmb(); /* first get the data from fifo->out until the end of the buffer */ l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + l, fifo->buffer, len - l); /* * Ensure that we remove the bytes from the kfifo -before- * we update the fifo->out index. * line33是读操作,line43是写操作,所以需要使用全屏障(隔离读和写)。 */ smp_mb(); fifo->out += len; return len; } EXPORT_SYMBOL(__kfifo_get); kfifo 的详细内容,请查阅相关资料,这里不再赘述。
__kfifo_put
还是 __kfifo_get
都使用了两次内存屏障。我们以 __kfifo_put
为例子来观察下这两个内存屏障,在 __kfifo_put
中,第一次使用内存屏障是 line27 的 smp_mb 第二次是 line42 的 smp_wmb。现在思考一个问题,这两个内存屏障可以省略么?为了解决这个问题,我们需要思考,如果省略了内存屏障会有什么问题?__kfifo_get
函数修改。如果省略smp_mb
在执行line30之前,__kfifo_get
对于fifo->out
的修改对于__kfifo_put
可能不可见。不可见会造成什么后果?在__kfifo_get
中会增加fifo->out
的长度,如果这个增加不可见,那么line19的len值就会小一些(相对于可见情况),也就是说可以put的数据就少一些,除此之外并没有什么其他后果。kfifo队列依然可以正常工作。综上所述,如果省略smp_mb,会造成一些性能问题,但不会有正确性问题。审核编辑:汤梓红
全部0条评论
快来发表一下你的评论吧 !